You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/02/17 10:44:39 UTC
[1/2] TAJO-305: Implement killQuery feature. (hyunsik)
Repository: incubator-tajo
Updated Branches:
refs/heads/master e2a7dffdb -> ae541ffae
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
index bcdba43..7e1a9bd 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
@@ -20,6 +20,7 @@ package org.apache.tajo.master.querymaster;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
+import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
@@ -96,7 +97,11 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
private static final DiagnosticsUpdateTransition DIAGNOSTIC_UPDATE_TRANSITION = new DiagnosticsUpdateTransition();
private static final InternalErrorTransition INTERNAL_ERROR_TRANSITION = new InternalErrorTransition();
private static final ContainerLaunchTransition CONTAINER_LAUNCH_TRANSITION = new ContainerLaunchTransition();
- private static final FailedTransition FAILED_TRANSITION = new FailedTransition();
+ private static final TaskCompletedTransition TASK_COMPLETED_TRANSITION = new TaskCompletedTransition();
+ private static final AllocatedContainersCancelTransition CONTAINERS_CANCEL_TRANSITION =
+ new AllocatedContainersCancelTransition();
+ private static final SubQueryCompleteTransition SUBQUERY_COMPLETED_TRANSITION =
+ new SubQueryCompleteTransition();
private StateMachine<SubQueryState, SubQueryEventType, SubQueryEvent> stateMachine;
protected static final StateMachineFactory<SubQuery, SubQueryState,
@@ -106,107 +111,142 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
// Transitions from NEW state
.addTransition(SubQueryState.NEW,
- EnumSet.of(SubQueryState.INIT, SubQueryState.ERROR, SubQueryState.SUCCEEDED),
- SubQueryEventType.SQ_INIT, new InitAndRequestContainer())
+ EnumSet.of(SubQueryState.INITED, SubQueryState.ERROR, SubQueryState.SUCCEEDED),
+ SubQueryEventType.SQ_INIT,
+ new InitAndRequestContainer())
.addTransition(SubQueryState.NEW, SubQueryState.NEW,
SubQueryEventType.SQ_DIAGNOSTIC_UPDATE,
DIAGNOSTIC_UPDATE_TRANSITION)
+ .addTransition(SubQueryState.NEW, SubQueryState.KILLED,
+ SubQueryEventType.SQ_KILL)
.addTransition(SubQueryState.NEW, SubQueryState.ERROR,
SubQueryEventType.SQ_INTERNAL_ERROR,
INTERNAL_ERROR_TRANSITION)
- // Transitions from INIT state
- .addTransition(SubQueryState.INIT, SubQueryState.CONTAINER_ALLOCATED,
+ // Transitions from INITED state
+ .addTransition(SubQueryState.INITED, SubQueryState.RUNNING,
SubQueryEventType.SQ_CONTAINER_ALLOCATED,
CONTAINER_LAUNCH_TRANSITION)
- .addTransition(SubQueryState.INIT, SubQueryState.INIT,
+ .addTransition(SubQueryState.INITED, SubQueryState.INITED,
SubQueryEventType.SQ_DIAGNOSTIC_UPDATE,
DIAGNOSTIC_UPDATE_TRANSITION)
- .addTransition(SubQueryState.NEW, SubQueryState.ERROR,
+ .addTransition(SubQueryState.INITED, SubQueryState.KILL_WAIT,
+ SubQueryEventType.SQ_KILL)
+ .addTransition(SubQueryState.INITED, SubQueryState.ERROR,
SubQueryEventType.SQ_INTERNAL_ERROR,
INTERNAL_ERROR_TRANSITION)
- // Transitions from CONTAINER_ALLOCATED state
- .addTransition(SubQueryState.CONTAINER_ALLOCATED,
- EnumSet.of(SubQueryState.RUNNING, SubQueryState.FAILED, SubQueryState.SUCCEEDED),
- SubQueryEventType.SQ_START,
- new StartTransition())
- .addTransition(SubQueryState.CONTAINER_ALLOCATED, SubQueryState.CONTAINER_ALLOCATED,
+ // Transitions from RUNNING state
+ .addTransition(SubQueryState.RUNNING, SubQueryState.RUNNING,
SubQueryEventType.SQ_CONTAINER_ALLOCATED,
CONTAINER_LAUNCH_TRANSITION)
- .addTransition(SubQueryState.CONTAINER_ALLOCATED, SubQueryState.CONTAINER_ALLOCATED,
+ .addTransition(SubQueryState.RUNNING, SubQueryState.RUNNING,
+ SubQueryEventType.SQ_TASK_COMPLETED,
+ TASK_COMPLETED_TRANSITION)
+ .addTransition(SubQueryState.RUNNING,
+ EnumSet.of(SubQueryState.SUCCEEDED, SubQueryState.FAILED),
+ SubQueryEventType.SQ_SUBQUERY_COMPLETED,
+ SUBQUERY_COMPLETED_TRANSITION)
+ .addTransition(SubQueryState.RUNNING, SubQueryState.RUNNING,
+ SubQueryEventType.SQ_FAILED,
+ TASK_COMPLETED_TRANSITION)
+ .addTransition(SubQueryState.RUNNING, SubQueryState.RUNNING,
SubQueryEventType.SQ_DIAGNOSTIC_UPDATE,
DIAGNOSTIC_UPDATE_TRANSITION)
- .addTransition(SubQueryState.CONTAINER_ALLOCATED, SubQueryState.ERROR,
+ .addTransition(SubQueryState.RUNNING, SubQueryState.KILL_WAIT,
+ SubQueryEventType.SQ_KILL,
+ new KillTasksTransition())
+ .addTransition(SubQueryState.RUNNING, SubQueryState.ERROR,
SubQueryEventType.SQ_INTERNAL_ERROR,
INTERNAL_ERROR_TRANSITION)
-
- // Transitions from RUNNING state
- .addTransition(SubQueryState.RUNNING, SubQueryState.RUNNING,
- SubQueryEventType.SQ_CONTAINER_ALLOCATED,
- CONTAINER_LAUNCH_TRANSITION)
+ // Ignore-able Transition
.addTransition(SubQueryState.RUNNING, SubQueryState.RUNNING,
SubQueryEventType.SQ_START)
- .addTransition(SubQueryState.RUNNING, SubQueryState.RUNNING,
+
+ // Transitions from KILL_WAIT state
+ .addTransition(SubQueryState.KILL_WAIT, SubQueryState.KILL_WAIT,
+ SubQueryEventType.SQ_CONTAINER_ALLOCATED,
+ CONTAINERS_CANCEL_TRANSITION)
+ .addTransition(SubQueryState.KILL_WAIT, SubQueryState.KILL_WAIT,
+ EnumSet.of(SubQueryEventType.SQ_KILL))
+ .addTransition(SubQueryState.KILL_WAIT, SubQueryState.KILL_WAIT,
SubQueryEventType.SQ_TASK_COMPLETED,
- new TaskCompletedTransition())
- .addTransition(SubQueryState.RUNNING, SubQueryState.SUCCEEDED,
+ TASK_COMPLETED_TRANSITION)
+ .addTransition(SubQueryState.KILL_WAIT,
+ EnumSet.of(SubQueryState.SUCCEEDED, SubQueryState.FAILED, SubQueryState.KILLED),
SubQueryEventType.SQ_SUBQUERY_COMPLETED,
- new SubQueryCompleteTransition())
- .addTransition(SubQueryState.RUNNING, SubQueryState.RUNNING,
+ SUBQUERY_COMPLETED_TRANSITION)
+ .addTransition(SubQueryState.KILL_WAIT, SubQueryState.KILL_WAIT,
SubQueryEventType.SQ_DIAGNOSTIC_UPDATE,
DIAGNOSTIC_UPDATE_TRANSITION)
- .addTransition(SubQueryState.RUNNING, SubQueryState.FAILED,
+ .addTransition(SubQueryState.KILL_WAIT, SubQueryState.KILL_WAIT,
SubQueryEventType.SQ_FAILED,
- FAILED_TRANSITION)
- .addTransition(SubQueryState.RUNNING, SubQueryState.ERROR,
+ TASK_COMPLETED_TRANSITION)
+ .addTransition(SubQueryState.KILL_WAIT, SubQueryState.ERROR,
SubQueryEventType.SQ_INTERNAL_ERROR,
INTERNAL_ERROR_TRANSITION)
- // Transitions from SUCCEEDED state
+ // Transitions from SUCCEEDED state
.addTransition(SubQueryState.SUCCEEDED, SubQueryState.SUCCEEDED,
- SubQueryEventType.SQ_START)
- .addTransition(SubQueryState.SUCCEEDED, SubQueryState.SUCCEEDED,
- SubQueryEventType.SQ_CONTAINER_ALLOCATED)
+ SubQueryEventType.SQ_CONTAINER_ALLOCATED,
+ CONTAINERS_CANCEL_TRANSITION)
.addTransition(SubQueryState.SUCCEEDED, SubQueryState.SUCCEEDED,
SubQueryEventType.SQ_DIAGNOSTIC_UPDATE,
DIAGNOSTIC_UPDATE_TRANSITION)
.addTransition(SubQueryState.SUCCEEDED, SubQueryState.ERROR,
SubQueryEventType.SQ_INTERNAL_ERROR,
INTERNAL_ERROR_TRANSITION)
+ // Ignore-able events
+ .addTransition(SubQueryState.SUCCEEDED, SubQueryState.SUCCEEDED,
+ EnumSet.of(
+ SubQueryEventType.SQ_START,
+ SubQueryEventType.SQ_KILL,
+ SubQueryEventType.SQ_CONTAINER_ALLOCATED))
// Transitions from FAILED state
.addTransition(SubQueryState.FAILED, SubQueryState.FAILED,
+ SubQueryEventType.SQ_CONTAINER_ALLOCATED,
+ CONTAINERS_CANCEL_TRANSITION)
+ .addTransition(SubQueryState.FAILED, SubQueryState.FAILED,
SubQueryEventType.SQ_DIAGNOSTIC_UPDATE,
DIAGNOSTIC_UPDATE_TRANSITION)
- .addTransition(SubQueryState.FAILED, SubQueryState.FAILED,
- SubQueryEventType.SQ_START)
- .addTransition(SubQueryState.FAILED, SubQueryState.FAILED,
- SubQueryEventType.SQ_CONTAINER_ALLOCATED)
- .addTransition(SubQueryState.FAILED, SubQueryState.FAILED,
- SubQueryEventType.SQ_FAILED)
.addTransition(SubQueryState.FAILED, SubQueryState.ERROR,
SubQueryEventType.SQ_INTERNAL_ERROR,
INTERNAL_ERROR_TRANSITION)
+ // Ignore-able transitions
+ .addTransition(SubQueryState.FAILED, SubQueryState.FAILED,
+ EnumSet.of(
+ SubQueryEventType.SQ_START,
+ SubQueryEventType.SQ_KILL,
+ SubQueryEventType.SQ_CONTAINER_ALLOCATED,
+ SubQueryEventType.SQ_FAILED))
// Transitions from FAILED state
.addTransition(SubQueryState.ERROR, SubQueryState.ERROR,
+ SubQueryEventType.SQ_CONTAINER_ALLOCATED,
+ CONTAINERS_CANCEL_TRANSITION)
+ .addTransition(SubQueryState.ERROR, SubQueryState.ERROR,
SubQueryEventType.SQ_DIAGNOSTIC_UPDATE,
DIAGNOSTIC_UPDATE_TRANSITION)
+ // Ignore-able transitions
.addTransition(SubQueryState.ERROR, SubQueryState.ERROR,
- SubQueryEventType.SQ_FAILED)
- .addTransition(SubQueryState.ERROR, SubQueryState.ERROR,
- SubQueryEventType.SQ_INTERNAL_ERROR)
+ EnumSet.of(
+ SubQueryEventType.SQ_START,
+ SubQueryEventType.SQ_KILL,
+ SubQueryEventType.SQ_FAILED,
+ SubQueryEventType.SQ_INTERNAL_ERROR))
.installTopology();
-
private final Lock readLock;
private final Lock writeLock;
private int totalScheduledObjectsCount;
- private int completedObjectCount = 0;
+ private int succeededObjectCount = 0;
private int completedTaskCount = 0;
+ private int succeededTaskCount = 0;
+ private int killedObjectCount = 0;
+ private int failedObjectCount = 0;
private TaskSchedulerContext schedulerContext;
public SubQuery(QueryMasterTask.QueryMasterTaskContext context, MasterPlan masterPlan, ExecutionBlock block, AbstractStorageManager sm) {
@@ -223,8 +263,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
}
public static boolean isRunningState(SubQueryState state) {
- return state == SubQueryState.INIT || state == SubQueryState.NEW ||
- state == SubQueryState.CONTAINER_ALLOCATED || state == SubQueryState.RUNNING;
+ return state == SubQueryState.INITED || state == SubQueryState.NEW || state == SubQueryState.RUNNING;
}
public QueryMasterTask.QueryMasterTaskContext getContext() {
@@ -271,15 +310,15 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
if (getState() == SubQueryState.NEW) {
return 0;
} else {
- return (float)(completedObjectCount) / (float)totalScheduledObjectsCount;
+ return (float)(succeededObjectCount) / (float)totalScheduledObjectsCount;
}
} finally {
readLock.unlock();
}
}
- public int getCompletedObjectCount() {
- return completedObjectCount;
+ public int getSucceededObjectCount() {
+ return succeededObjectCount;
}
public int getTotalScheduledObjectsCount() {
@@ -294,15 +333,29 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
tasks.put(task.getId(), task);
}
- public void abortSubQuery(SubQueryState finalState) {
+ /**
+ * It finalizes this subquery. It is only invoked when the subquery is succeeded.
+ */
+ public void complete() {
+ cleanup();
+ finalizeStats();
+ setFinishTime();
+ eventHandler.handle(new SubQueryCompletedEvent(getId(), SubQueryState.SUCCEEDED));
+ }
+
+ /**
+ * It finalizes this subquery. Unlike {@link SubQuery#complete()},
+ * it is invoked when a subquery is abnormally finished.
+ *
+ * @param finalState The final subquery state
+ */
+ public void abort(SubQueryState finalState) {
// TODO -
// - committer.abortSubQuery(...)
// - record SubQuery Finish Time
// - CleanUp Tasks
// - Record History
-
- stopScheduler();
- releaseContainers();
+ cleanup();
setFinishTime();
eventHandler.handle(new SubQueryCompletedEvent(getId(), finalState));
}
@@ -394,7 +447,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
}
}
- public static TableStats computeStatFromUnionBlock(SubQuery subQuery) {
+ private static TableStats computeStatFromUnionBlock(SubQuery subQuery) {
TableStats stat = new TableStats();
TableStats childStat;
long avgRows = 0, numBytes = 0, numRows = 0;
@@ -443,10 +496,13 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
private void releaseContainers() {
// If there are still live TaskRunners, try to kill the containers.
- eventHandler.handle(new TaskRunnerGroupEvent(EventType.CONTAINER_REMOTE_CLEANUP ,getId(), containers.values()));
+ eventHandler.handle(new TaskRunnerGroupEvent(EventType.CONTAINER_REMOTE_CLEANUP, getId(), containers.values()));
}
- private void finish() {
+ /**
+ * It computes all stats and sets the intermediate result.
+ */
+ private void finalizeStats() {
TableStats stats;
if (block.hasUnion()) {
stats = computeStatFromUnionBlock(this);
@@ -466,9 +522,6 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
schema = channel.getSchema();
meta = CatalogUtil.newTableMeta(storeType, new Options());
statistics = stats;
- setFinishTime();
-
- eventHandler.handle(new SubQuerySucceeEvent(getId()));
}
@Override
@@ -516,7 +569,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
try {
// Union operator does not require actual query processing. It is performed logically.
if (execBlock.hasUnion()) {
- subQuery.finish();
+ subQuery.finalizeStats();
state = SubQueryState.SUCCEEDED;
} else {
ExecutionBlock parent = subQuery.getMasterPlan().getParent(subQuery.getBlock());
@@ -529,12 +582,13 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
if (subQuery.getTaskScheduler().remainingScheduledObjectNum() == 0) { // if there is no tasks
subQuery.stopScheduler();
- subQuery.finish();
+ subQuery.finalizeStats();
+ subQuery.eventHandler.handle(new SubQueryCompletedEvent(subQuery.getId(), SubQueryState.SUCCEEDED));
return SubQueryState.SUCCEEDED;
} else {
subQuery.taskScheduler.start();
allocateContainers(subQuery);
- return SubQueryState.INIT;
+ return SubQueryState.INITED;
}
}
} catch (Exception e) {
@@ -826,7 +880,6 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
return unit;
}
- int i = 0;
private static class ContainerLaunchTransition
implements SingleArcTransition<SubQuery, SubQueryEvent> {
@@ -838,42 +891,51 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
for (Container container : allocationEvent.getAllocatedContainer()) {
ContainerId cId = container.getId();
if (subQuery.containers.containsKey(cId)) {
- LOG.info(">>>>>>>>>>>> Duplicate Container! <<<<<<<<<<<");
+ subQuery.eventHandler.handle(new SubQueryDiagnosticsUpdateEvent(subQuery.getId(),
+ "Duplicated containers are allocated: " + cId.toString()));
+ subQuery.eventHandler.handle(new SubQueryEvent(subQuery.getId(), SubQueryEventType.SQ_INTERNAL_ERROR));
}
subQuery.containers.put(cId, container);
- // TODO - This is debugging message. Should be removed
- subQuery.i++;
}
- LOG.info("SubQuery (" + subQuery.getId() + ") has " + subQuery.i + " containers!");
+ LOG.info("SubQuery (" + subQuery.getId() + ") has " + subQuery.containers.size() + " containers!");
subQuery.eventHandler.handle(
new TaskRunnerGroupEvent(EventType.CONTAINER_REMOTE_LAUNCH,
subQuery.getId(), allocationEvent.getAllocatedContainer()));
subQuery.eventHandler.handle(new SubQueryEvent(subQuery.getId(), SubQueryEventType.SQ_START));
} catch (Throwable t) {
-
+ subQuery.eventHandler.handle(new SubQueryDiagnosticsUpdateEvent(subQuery.getId(),
+ ExceptionUtils.getStackTrace(t)));
+ subQuery.eventHandler.handle(new SubQueryEvent(subQuery.getId(), SubQueryEventType.SQ_INTERNAL_ERROR));
}
}
}
- private static class StartTransition implements
- MultipleArcTransition<SubQuery, SubQueryEvent, SubQueryState> {
-
+ /**
+ * It is used in KILL_WAIT state against Contained Allocated event.
+ * It just returns allocated containers to resource manager.
+ */
+ private static class AllocatedContainersCancelTransition implements SingleArcTransition<SubQuery, SubQueryEvent> {
@Override
- public SubQueryState transition(SubQuery subQuery,
- SubQueryEvent subQueryEvent) {
- // schedule tasks
+ public void transition(SubQuery subQuery, SubQueryEvent event) {
try {
- return SubQueryState.RUNNING;
- } catch (Exception e) {
- LOG.warn("SubQuery (" + subQuery.getId() + ") failed", e);
- return SubQueryState.FAILED;
+ SubQueryContainerAllocationEvent allocationEvent =
+ (SubQueryContainerAllocationEvent) event;
+ subQuery.eventHandler.handle(
+ new TaskRunnerGroupEvent(EventType.CONTAINER_REMOTE_CLEANUP,
+ subQuery.getId(), allocationEvent.getAllocatedContainer()));
+ LOG.info(String.format("[%s] %d allocated containers are canceled",
+ subQuery.getId().toString(),
+ allocationEvent.getAllocatedContainer().size()));
+ } catch (Throwable t) {
+ subQuery.eventHandler.handle(new SubQueryDiagnosticsUpdateEvent(subQuery.getId(),
+ ExceptionUtils.getStackTrace(t)));
+ subQuery.eventHandler.handle(new SubQueryEvent(subQuery.getId(), SubQueryEventType.SQ_INTERNAL_ERROR));
}
}
}
- private static class TaskCompletedTransition
- implements SingleArcTransition<SubQuery, SubQueryEvent> {
+ private static class TaskCompletedTransition implements SingleArcTransition<SubQuery, SubQueryEvent> {
@Override
public void transition(SubQuery subQuery,
@@ -882,39 +944,101 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
QueryUnit task = subQuery.getQueryUnit(taskEvent.getTaskId());
if (task == null) { // task failed
+ LOG.error(String.format("Task %s is absent", taskEvent.getTaskId()));
subQuery.eventHandler.handle(new SubQueryEvent(subQuery.getId(), SubQueryEventType.SQ_FAILED));
} else {
- QueryUnitAttempt taskAttempt = task.getSuccessfulAttempt();
- if (task.isLeafTask()) {
- subQuery.completedObjectCount += task.getTotalFragmentNum();
- } else {
- subQuery.completedObjectCount++;
- }
subQuery.completedTaskCount++;
- LOG.info(subQuery.getId() + " SubQuery Succeeded " + subQuery.completedTaskCount + "/"
- + subQuery.schedulerContext.getEstimatedTaskNum() + " on " + taskAttempt.getHost() + ":" + taskAttempt.getPort());
- if (subQuery.taskScheduler.remainingScheduledObjectNum() == 0
- && subQuery.totalScheduledObjectsCount == subQuery.completedObjectCount) {
- subQuery.eventHandler.handle(new SubQueryEvent(subQuery.getId(),
- SubQueryEventType.SQ_SUBQUERY_COMPLETED));
+ if (taskEvent.getState() == TaskState.SUCCEEDED) {
+ if (task.isLeafTask()) {
+ subQuery.succeededObjectCount += task.getTotalFragmentNum();
+ } else {
+ subQuery.succeededObjectCount++;
+ }
+ } else if (task.getState() == TaskState.KILLED) {
+ if (task.isLeafTask()) {
+ subQuery.killedObjectCount += task.getTotalFragmentNum();
+ } else {
+ subQuery.killedObjectCount++;
+ }
+ } else if (task.getState() == TaskState.FAILED) {
+ if (task.isLeafTask()) {
+ subQuery.failedObjectCount+= task.getTotalFragmentNum();
+ } else {
+ subQuery.failedObjectCount++;
+ }
+
+ // if at least one task is failed, try to kill all tasks.
+ subQuery.eventHandler.handle(new SubQueryEvent(subQuery.getId(), SubQueryEventType.SQ_KILL));
+ }
+
+ LOG.info(String.format("[%s] Task Completion Event (Total: %d, Success: %d, Killed: %d, Failed: %d",
+ subQuery.getId(),
+ subQuery.getTotalScheduledObjectsCount(),
+ subQuery.succeededObjectCount,
+ subQuery.killedObjectCount,
+ subQuery.failedObjectCount));
+
+ if (subQuery.totalScheduledObjectsCount ==
+ subQuery.succeededObjectCount + subQuery.killedObjectCount + subQuery.failedObjectCount) {
+ subQuery.eventHandler.handle(new SubQueryEvent(subQuery.getId(), SubQueryEventType.SQ_SUBQUERY_COMPLETED));
}
}
}
}
- private static class SubQueryCompleteTransition
- implements SingleArcTransition<SubQuery, SubQueryEvent> {
+ private static class KillTasksTransition implements SingleArcTransition<SubQuery, SubQueryEvent> {
@Override
public void transition(SubQuery subQuery, SubQueryEvent subQueryEvent) {
+ subQuery.getTaskScheduler().stop();
+ for (QueryUnit queryUnit : subQuery.getQueryUnits()) {
+ subQuery.eventHandler.handle(new TaskEvent(queryUnit.getId(), TaskEventType.T_KILL));
+ }
+ }
+ }
+
+ private void cleanup() {
+ stopScheduler();
+ releaseContainers();
+ }
+
+ private static class SubQueryCompleteTransition
+ implements MultipleArcTransition<SubQuery, SubQueryEvent, SubQueryState> {
+
+ @Override
+ public SubQueryState transition(SubQuery subQuery, SubQueryEvent subQueryEvent) {
// TODO - Commit subQuery & do cleanup
// TODO - records succeeded, failed, killed completed task
// TODO - records metrics
- LOG.info("SubQuery finished:" + subQuery.getId());
- subQuery.stopScheduler();
- subQuery.releaseContainers();
- subQuery.finish();
+ try {
+ LOG.info(String.format("subQuery completed - %s (total=%d, success=%d, killed=%d)",
+ subQuery.getId().toString(),
+ subQuery.getTotalScheduledObjectsCount(),
+ subQuery.getSucceededObjectCount(),
+ subQuery.killedObjectCount));
+
+ if (subQuery.killedObjectCount > 0 || subQuery.failedObjectCount > 0) {
+ if (subQuery.failedObjectCount > 0) {
+ subQuery.abort(SubQueryState.FAILED);
+ return SubQueryState.FAILED;
+ } else if (subQuery.killedObjectCount > 0) {
+ subQuery.abort(SubQueryState.KILLED);
+ return SubQueryState.KILLED;
+ } else {
+ LOG.error("Invalid State " + subQuery.getState() + " State");
+ subQuery.abort(SubQueryState.ERROR);
+ return SubQueryState.ERROR;
+ }
+ } else {
+ subQuery.complete();
+ return SubQueryState.SUCCEEDED;
+ }
+ } catch (Throwable t) {
+ LOG.error(t);
+ subQuery.abort(SubQueryState.ERROR);
+ return SubQueryState.ERROR;
+ }
}
}
@@ -928,14 +1052,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
private static class InternalErrorTransition implements SingleArcTransition<SubQuery, SubQueryEvent> {
@Override
public void transition(SubQuery subQuery, SubQueryEvent subQueryEvent) {
- subQuery.abortSubQuery(SubQueryState.ERROR);
- }
- }
-
- private static class FailedTransition implements SingleArcTransition<SubQuery, SubQueryEvent> {
- @Override
- public void transition(SubQuery subQuery, SubQueryEvent subQueryEvent) {
- subQuery.abortSubQuery(SubQueryState.FAILED);
+ subQuery.abort(SubQueryState.ERROR);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQueryState.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQueryState.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQueryState.java
index ce4d209..effcfde 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQueryState.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQueryState.java
@@ -20,10 +20,11 @@ package org.apache.tajo.master.querymaster;
public enum SubQueryState {
NEW,
- CONTAINER_ALLOCATED,
- INIT,
+ INITED,
RUNNING,
SUCCEEDED,
FAILED,
+ KILL_WAIT,
+ KILLED,
ERROR
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
index 16427b6..28386bb 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
@@ -21,10 +21,6 @@ package org.apache.tajo.worker;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -53,7 +49,6 @@ import org.apache.tajo.rpc.NettyClientBase;
import org.apache.tajo.rpc.RpcConnectionPool;
import org.apache.tajo.util.ApplicationIdUtils;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@@ -63,12 +58,10 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
public class TajoResourceAllocator extends AbstractResourceAllocator {
private static final Log LOG = LogFactory.getLog(TajoResourceAllocator.class);
- static AtomicInteger containerIdSeq = new AtomicInteger(0);
private TajoConf tajoConf;
private QueryMasterTask.QueryMasterTaskContext queryTaskContext;
private final ExecutorService executorService;
@@ -110,7 +103,7 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
tajoConf = (TajoConf)conf;
queryTaskContext.getDispatcher().register(TaskRunnerGroupEvent.EventType.class, new TajoTaskRunnerLauncher());
-//
+
queryTaskContext.getDispatcher().register(ContainerAllocatorEventType.class, new TajoWorkerAllocationHandler());
super.init(conf);
@@ -140,23 +133,6 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
super.start();
}
- final public static FsPermission QUERYCONF_FILE_PERMISSION =
- FsPermission.createImmutable((short) 0644); // rw-r--r--
-
- private static void writeConf(Configuration conf, Path queryConfFile)
- throws IOException {
- // Write job file to Tajo's fs
- FileSystem fs = queryConfFile.getFileSystem(conf);
- FSDataOutputStream out =
- FileSystem.create(fs, queryConfFile,
- new FsPermission(QUERYCONF_FILE_PERMISSION));
- try {
- conf.writeXml(out);
- } finally {
- out.close();
- }
- }
-
class TajoTaskRunnerLauncher implements TaskRunnerLauncher {
@Override
public void handle(TaskRunnerGroupEvent event) {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
index da3dc34..a73623f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
@@ -130,7 +130,7 @@ public class TajoWorkerClientService extends AbstractService {
RpcController controller,
ClientProtos.GetQueryResultRequest request) throws ServiceException {
QueryId queryId = new QueryId(request.getQueryId());
- Query query = workerContext.getQueryMaster().getQuery(queryId);
+ Query query = workerContext.getQueryMaster().getQueryMasterTask(queryId, true).getQuery();
ClientProtos.GetQueryResultResponse.Builder builder = ClientProtos.GetQueryResultResponse.newBuilder();
try {
@@ -171,31 +171,35 @@ public class TajoWorkerClientService extends AbstractService {
builder.setState(TajoProtos.QueryState.QUERY_SUCCEEDED);
} else {
QueryMasterTask queryMasterTask = workerContext.getQueryMaster().getQueryMasterTask(queryId);
+
builder.setResultCode(ClientProtos.ResultCode.OK);
builder.setQueryMasterHost(bindAddr.getHostName());
builder.setQueryMasterPort(bindAddr.getPort());
- if (queryMasterTask != null) {
- queryMasterTask.touchSessionTime();
- Query query = queryMasterTask.getQuery();
-
- builder.setState(query.getState());
- builder.setProgress(query.getProgress());
- builder.setSubmitTime(query.getAppSubmitTime());
- builder.setHasResult(
- !(queryMasterTask.getQueryTaskContext().getQueryContext().isCreateTable() ||
- queryMasterTask.getQueryTaskContext().getQueryContext().isInsert())
- );
- if (query.getState() == TajoProtos.QueryState.QUERY_SUCCEEDED) {
- builder.setFinishTime(query.getFinishTime());
- } else {
- builder.setFinishTime(System.currentTimeMillis());
- }
- } else {
+ if (queryMasterTask == null) {
+ queryMasterTask = workerContext.getQueryMaster().getQueryMasterTask(queryId, true);
+ }
+ if (queryMasterTask == null) {
builder.setState(TajoProtos.QueryState.QUERY_NOT_ASSIGNED);
+ return builder.build();
}
- }
+ queryMasterTask.touchSessionTime();
+ Query query = queryMasterTask.getQuery();
+
+ builder.setState(query.getState());
+ builder.setProgress(query.getProgress());
+ builder.setSubmitTime(query.getAppSubmitTime());
+ builder.setHasResult(
+ !(queryMasterTask.getQueryTaskContext().getQueryContext().isCreateTable() ||
+ queryMasterTask.getQueryTaskContext().getQueryContext().isInsert())
+ );
+ if (query.getState() == TajoProtos.QueryState.QUERY_SUCCEEDED) {
+ builder.setFinishTime(query.getFinishTime());
+ } else {
+ builder.setFinishTime(System.currentTimeMillis());
+ }
+ }
return builder.build();
}
@@ -205,12 +209,6 @@ public class TajoWorkerClientService extends AbstractService {
TajoIdProtos.QueryIdProto request) throws ServiceException {
final QueryId queryId = new QueryId(request);
LOG.info("Stop Query:" + queryId);
- Thread t = new Thread() {
- public void run() {
- workerContext.getQueryMaster().getContext().stopQuery(queryId);
- }
- };
- t.start();
return BOOL_TRUE;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
index c770696..392a7cf 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
@@ -26,6 +26,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.CompositeService;
import org.apache.tajo.QueryId;
+import org.apache.tajo.QueryUnitAttemptId;
import org.apache.tajo.TajoIdProtos;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.ipc.TajoWorkerProtocol;
@@ -136,6 +137,13 @@ public class TajoWorkerManagerService extends CompositeService
}
@Override
+ public void killTaskAttempt(RpcController controller, TajoIdProtos.QueryUnitAttemptIdProto request,
+ RpcCallback<PrimitiveProtos.BoolProto> done) {
+ workerContext.getTaskRunnerManager().findTaskByQueryUnitAttemptId(new QueryUnitAttemptId(request)).kill();
+ done.run(TajoWorker.TRUE_PROTO);
+ }
+
+ @Override
public void cleanup(RpcController controller, TajoIdProtos.QueryIdProto request,
RpcCallback<PrimitiveProtos.BoolProto> done) {
workerContext.cleanup(new QueryId(request).toString());
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
index c31e9cd..066e11c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
@@ -91,9 +91,10 @@ public class Task {
private final Reporter reporter;
private Path inputTableBaseDir;
- private static int completed = 0;
- private static int failed = 0;
- private static int succeeded = 0;
+ private static int completedTasksNum = 0;
+ private static int succeededTasksNum = 0;
+ private static int killedTasksNum = 0;
+ private static int failedTasksNum = 0;
private long startTime;
private long finishTime;
@@ -282,7 +283,6 @@ public class Task {
public void kill() {
killed = true;
context.stop();
- context.setState(TaskAttemptState.TA_KILLED);
setProgressFlag();
releaseChannelFactory();
}
@@ -290,7 +290,6 @@ public class Task {
public void abort() {
aborted = true;
context.stop();
- context.setState(TaskAttemptState.TA_FAILED);
releaseChannelFactory();
}
@@ -372,7 +371,7 @@ public class Task {
this.executor = taskRunnerContext.getTQueryEngine().
createPlan(context, plan);
this.executor.init();
- while(executor.next() != null && !killed) {
+ while(!killed && executor.next() != null) {
++progress;
}
this.executor.close();
@@ -386,21 +385,25 @@ public class Task {
} finally {
setProgressFlag();
stopped = true;
- completed++;
+ completedTasksNum++;
if (killed || aborted) {
context.setProgress(0.0f);
if(killed) {
context.setState(TaskAttemptState.TA_KILLED);
+ masterProxy.statusUpdate(null, getReport(), NullCallback.get());
+ killedTasksNum++;
} else {
context.setState(TaskAttemptState.TA_FAILED);
- }
-
- TaskFatalErrorReport.Builder errorBuilder =
- TaskFatalErrorReport.newBuilder()
- .setId(getId().getProto());
- if (errorMessage != null) {
+ TaskFatalErrorReport.Builder errorBuilder =
+ TaskFatalErrorReport.newBuilder()
+ .setId(getId().getProto());
+ if (errorMessage != null) {
errorBuilder.setErrorMessage(errorMessage);
+ }
+
+ masterProxy.fatalError(null, errorBuilder.build(), NullCallback.get());
+ failedTasksNum++;
}
// stopping the status report
@@ -410,9 +413,6 @@ public class Task {
LOG.warn(e);
}
- masterProxy.fatalError(null, errorBuilder.build(), NullCallback.get());
- failed++;
-
} else {
// if successful
context.setProgress(1.0f);
@@ -427,14 +427,14 @@ public class Task {
TaskCompletionReport report = getTaskCompletionReport();
masterProxy.done(null, report, NullCallback.get());
- succeeded++;
+ succeededTasksNum++;
}
finishTime = System.currentTimeMillis();
cleanupTask();
- LOG.info("Task Counter - total:" + completed + ", succeeded: " + succeeded
- + ", failed: " + failed);
+ LOG.info("Task Counter - total:" + completedTasksNum + ", succeeded: " + succeededTasksNum
+ + ", killed: " + killedTasksNum + ", failed: " + failedTasksNum);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
index 7b49c15..9a38aef 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
@@ -81,8 +81,7 @@ public class TaskRunner extends AbstractService {
// for Fetcher
private final ExecutorService fetchLauncher;
// It keeps all of the query unit attempts while a TaskRunner is running.
- private final Map<QueryUnitAttemptId, Task> tasks =
- new ConcurrentHashMap<QueryUnitAttemptId, Task>();
+ private final Map<QueryUnitAttemptId, Task> tasks = new ConcurrentHashMap<QueryUnitAttemptId, Task>();
private final Map<QueryUnitAttemptId, TaskHistory> taskHistories =
new ConcurrentHashMap<QueryUnitAttemptId, TaskHistory>();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/proto/QueryMasterProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/QueryMasterProtocol.proto b/tajo-core/tajo-core-backend/src/main/proto/QueryMasterProtocol.proto
index fe5bf03..e12c9aa 100644
--- a/tajo-core/tajo-core-backend/src/main/proto/QueryMasterProtocol.proto
+++ b/tajo-core/tajo-core-backend/src/main/proto/QueryMasterProtocol.proto
@@ -37,5 +37,6 @@ service QueryMasterProtocolService {
rpc done (TaskCompletionReport) returns (BoolProto);
//from TajoMaster's QueryJobManager
+ rpc killQuery(QueryIdProto) returns (BoolProto);
rpc executeQuery(QueryExecutionRequestProto) returns (BoolProto);
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto b/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
index e08da29..3fdd221 100644
--- a/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
+++ b/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
@@ -156,9 +156,9 @@ message RunExecutionBlockRequestProto {
service TajoWorkerProtocolService {
rpc ping (QueryUnitAttemptIdProto) returns (BoolProto);
- //from QueryMaster(Worker)
+ // from QueryMaster(Worker)
rpc executeExecutionBlock(RunExecutionBlockRequestProto) returns (BoolProto);
-
+ rpc killTaskAttempt(QueryUnitAttemptIdProto) returns (BoolProto);
rpc cleanup(QueryIdProto) returns (BoolProto);
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/query.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/query.jsp b/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/query.jsp
index 80259c8..e3a356d 100644
--- a/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/query.jsp
+++ b/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/query.jsp
@@ -69,7 +69,7 @@
} else {
%>
<table width="100%" border="1" class='border_table'>
- <tr></tr><th>QueryId</th><th>Query Master</th><th>Started</th><th>Progress</th><th>Time</th><th>sql</th></tr>
+ <tr></tr><th>QueryId</th><th>Query Master</th><th>Started</th><th>Progress</th><th>Time</th><th>Status</th></th><th>sql</th></tr>
<%
for(QueryInProgress eachQuery: runningQueries) {
long time = System.currentTimeMillis() - eachQuery.getQueryInfo().getStartTime();
@@ -82,6 +82,7 @@
<td><%=df.format(eachQuery.getQueryInfo().getStartTime())%></td>
<td><%=(int)(eachQuery.getQueryInfo().getProgress() * 100.0f)%>%</td>
<td><%=StringUtils.formatTime(time)%></td>
+ <td><%=eachQuery.getQueryInfo().getQueryState()%></td>
<td><%=eachQuery.getQueryInfo().getSql()%></td>
</tr>
<%
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/querydetail.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/querydetail.jsp b/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/querydetail.jsp
index 7656dfb..94e5f2e 100644
--- a/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/querydetail.jsp
+++ b/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/querydetail.jsp
@@ -62,7 +62,7 @@
<tr><th>ID</th><th>State</th><th>Started</th><th>Finished</th><th>Running time</th><th>Progress</th><th>Tasks</th></tr>
<%
for(SubQuery eachSubQuery: subQueries) {
- eachSubQuery.getCompletedObjectCount();
+ eachSubQuery.getSucceededObjectCount();
String detailLink = "querytasks.jsp?queryId=" + queryId + "&ebid=" + eachSubQuery.getId();
%>
<tr>
@@ -72,7 +72,7 @@ for(SubQuery eachSubQuery: subQueries) {
<td><%=eachSubQuery.getFinishTime() == 0 ? "-" : df.format(eachSubQuery.getFinishTime())%></td>
<td><%=JSPUtil.getElapsedTime(eachSubQuery.getStartTime(), eachSubQuery.getFinishTime())%></td>
<td align='center'><%=JSPUtil.percentFormat(eachSubQuery.getProgress())%>%</td>
- <td align='center'><a href='<%=detailLink%>&status=SUCCEEDED'><%=eachSubQuery.getCompletedObjectCount()%></a>/<a href='<%=detailLink%>&status=ALL'><%=eachSubQuery.getTotalScheduledObjectsCount()%></a></td>
+ <td align='center'><a href='<%=detailLink%>&status=SUCCEEDED'><%=eachSubQuery.getSucceededObjectCount()%></a>/<a href='<%=detailLink%>&status=ALL'><%=eachSubQuery.getTotalScheduledObjectsCount()%></a></td>
</tr>
<%
} //end of for
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/client/TestTajoClient.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/client/TestTajoClient.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/client/TestTajoClient.java
index 13db0d0..ee03bd6 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/client/TestTajoClient.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/client/TestTajoClient.java
@@ -24,14 +24,12 @@ import com.sun.org.apache.commons.logging.Log;
import com.sun.org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.tajo.BackendTestingUtil;
-import org.apache.tajo.IntegrationTest;
-import org.apache.tajo.TajoTestingCluster;
-import org.apache.tajo.TpchTestBase;
+import org.apache.tajo.*;
import org.apache.tajo.catalog.FunctionDesc;
import org.apache.tajo.catalog.TableDesc;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ipc.ClientProtos;
import org.apache.tajo.storage.StorageUtil;
import org.apache.tajo.util.CommonTestingUtil;
import org.junit.BeforeClass;
@@ -68,6 +66,15 @@ public class TestTajoClient {
}
@Test
+ public final void testKillQuery() throws IOException, ServiceException, InterruptedException {
+ ClientProtos.GetQueryStatusResponse res = client.executeQuery("select sleep(1) from lineitem");
+ Thread.sleep(1000);
+ QueryId queryId = new QueryId(res.getQueryId());
+ client.killQuery(queryId);
+ assertEquals(TajoProtos.QueryState.QUERY_KILLED, client.getQueryStatus(queryId).getState());
+ }
+
+ @Test
public final void testUpdateQuery() throws IOException, ServiceException {
final String tableName = "testUpdateQuery";
Path tablePath = writeTmpTable(tableName);
[2/2] git commit: TAJO-305: Implement killQuery feature. (hyunsik)
Posted by hy...@apache.org.
TAJO-305: Implement killQuery feature. (hyunsik)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/ae541ffa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/ae541ffa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/ae541ffa
Branch: refs/heads/master
Commit: ae541ffae406faa53ace5ad19cb503b96c39c549
Parents: e2a7dff
Author: Hyunsik Choi <hy...@apache.org>
Authored: Mon Feb 17 18:44:19 2014 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Mon Feb 17 18:44:19 2014 +0900
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../main/java/org/apache/tajo/cli/TajoCli.java | 32 +-
.../java/org/apache/tajo/client/TajoAdmin.java | 28 +-
.../java/org/apache/tajo/client/TajoClient.java | 9 +-
tajo-common/src/main/proto/tajo_protos.proto | 8 +-
.../tajo/engine/function/builtin/Sleep.java | 52 +++
.../SortBasedColPartitionStoreExec.java | 8 +-
.../tajo/master/AbstractTaskScheduler.java | 3 +-
.../tajo/master/DefaultTaskScheduler.java | 11 +
.../apache/tajo/master/TajoContainerProxy.java | 20 ++
.../tajo/master/TajoMasterClientService.java | 13 +-
.../tajo/master/event/LocalTaskEvent.java | 45 +++
.../tajo/master/event/LocalTaskEventType.java | 23 ++
.../tajo/master/event/QueryCompletedEvent.java | 42 +++
.../tajo/master/event/QueryEventType.java | 13 +-
.../tajo/master/event/QueryFinishEvent.java | 39 ---
.../event/QueryMasterQueryCompletedEvent.java | 39 +++
.../master/event/SubQueryCompletedEvent.java | 2 +-
.../tajo/master/event/SubQueryEventType.java | 2 +
.../tajo/master/event/SubQuerySucceeEvent.java | 30 --
.../tajo/master/event/SubQueryTaskEvent.java | 12 +-
.../tajo/master/event/TaskAttemptEventType.java | 2 +
.../tajo/master/event/TaskSchedulerEvent.java | 2 +-
.../apache/tajo/master/querymaster/Query.java | 231 ++++++++-----
.../master/querymaster/QueryInProgress.java | 6 +
.../tajo/master/querymaster/QueryJobEvent.java | 3 +-
.../querymaster/QueryMasterManagerService.java | 42 ++-
.../master/querymaster/QueryMasterTask.java | 46 ++-
.../tajo/master/querymaster/QueryUnit.java | 148 +++++++--
.../master/querymaster/QueryUnitAttempt.java | 107 +++++-
.../tajo/master/querymaster/SubQuery.java | 327 +++++++++++++------
.../tajo/master/querymaster/SubQueryState.java | 5 +-
.../tajo/worker/TajoResourceAllocator.java | 26 +-
.../tajo/worker/TajoWorkerClientService.java | 48 ++-
.../tajo/worker/TajoWorkerManagerService.java | 8 +
.../main/java/org/apache/tajo/worker/Task.java | 38 +--
.../java/org/apache/tajo/worker/TaskRunner.java | 3 +-
.../src/main/proto/QueryMasterProtocol.proto | 1 +
.../src/main/proto/TajoWorkerProtocol.proto | 4 +-
.../src/main/resources/webapps/admin/query.jsp | 3 +-
.../resources/webapps/worker/querydetail.jsp | 4 +-
.../org/apache/tajo/client/TestTajoClient.java | 15 +-
42 files changed, 1053 insertions(+), 449 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 9e21181..879d816 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -130,6 +130,8 @@ Release 0.8.0 - unreleased
IMPROVEMENTS
+ TAJO-305: Implement killQuery feature. (hyunsik)
+
TAJO-598: Refactoring Tajo RPC. (jinho)
TAJO-592: HCatalogStore should supports RCFile and default hive field delimiter. (jaehwa)
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java b/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java
index 57a7294..f107c51 100644
--- a/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java
+++ b/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java
@@ -286,7 +286,22 @@ public class TajoCli {
} else if (cmds[0].equalsIgnoreCase("detach") && cmds.length > 1 && cmds[1].equalsIgnoreCase("table")) {
// this command should be moved to GlobalEngine
invokeCommand(cmds);
-
+ } else if (cmds[0].equalsIgnoreCase("explain") && cmds.length > 1) {
+ String sql = stripped.substring(8);
+ ClientProtos.ExplainQueryResponse response = client.explainQuery(sql);
+ if (response == null) {
+ sout.println("response is null");
+ } else {
+ if (response.hasExplain()) {
+ sout.println(response.getExplain());
+ } else {
+ if (response.hasErrorMessage()) {
+ sout.println(response.getErrorMessage());
+ } else {
+ sout.println("No Explain");
+ }
+ }
+ }
} else { // submit a query to TajoMaster
ClientProtos.GetQueryStatusResponse response = client.executeQuery(stripped);
if (response == null) {
@@ -299,7 +314,7 @@ public class TajoCli {
if (queryId.equals(QueryIdFactory.NULL_QUERY_ID)) {
sout.println("OK");
} else {
- getQueryResult(queryId);
+ waitForQueryCompleted(queryId);
}
} finally {
if(queryId != null) {
@@ -316,11 +331,7 @@ public class TajoCli {
return 0;
}
- private boolean isFailed(QueryState state) {
- return state == QueryState.QUERY_ERROR || state == QueryState.QUERY_FAILED;
- }
-
- private void getQueryResult(QueryId queryId) {
+ private void waitForQueryCompleted(QueryId queryId) {
// if query is empty string
if (queryId.equals(QueryIdFactory.NULL_QUERY_ID)) {
return;
@@ -341,14 +352,15 @@ public class TajoCli {
continue;
}
- if (status.getState() == QueryState.QUERY_RUNNING ||
- status.getState() == QueryState.QUERY_SUCCEEDED) {
+ if (status.getState() == QueryState.QUERY_RUNNING || status.getState() == QueryState.QUERY_SUCCEEDED) {
sout.println("Progress: " + (int)(status.getProgress() * 100.0f)
+ "%, response time: " + ((float)(status.getFinishTime() - status.getSubmitTime()) / 1000.0) + " sec");
sout.flush();
}
- if (status.getState() != QueryState.QUERY_RUNNING && status.getState() != QueryState.QUERY_NOT_ASSIGNED) {
+ if (status.getState() != QueryState.QUERY_RUNNING &&
+ status.getState() != QueryState.QUERY_NOT_ASSIGNED &&
+ status.getState() != QueryState.QUERY_KILL_WAIT) {
break;
} else {
Thread.sleep(Math.min(200 * progressRetries, 1000));
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java
index 98939a5..e6fe88f 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java
@@ -25,6 +25,7 @@ import org.apache.tajo.TajoProtos.QueryState;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.ipc.ClientProtos.BriefQueryInfo;
import org.apache.tajo.ipc.ClientProtos.WorkerResourceInfo;
+import org.apache.tajo.util.TajoIdUtils;
import java.io.IOException;
import java.io.PrintWriter;
@@ -58,9 +59,10 @@ public class TajoAdmin {
options = new Options();
options.addOption("h", "host", true, "Tajo server host");
options.addOption("p", "port", true, "Tajo server port");
- options.addOption("list", "list", false, "Show Tajo query list");
- options.addOption("cluster", "cluster", false, "Show Cluster Info");
- options.addOption("desc", "desc", false, "Show Query Description");
+ options.addOption("list", null, false, "Show Tajo query list");
+ options.addOption("cluster", null, false, "Show Cluster Info");
+ options.addOption("desc", null, false, "Show Query Description");
+ options.addOption("kill", null, true, "Kill a running query");
}
private static void printUsage() {
@@ -98,12 +100,17 @@ public class TajoAdmin {
port = Integer.parseInt(cmd.getOptionValue("p"));
}
+ String queryId = null;
+
if (cmd.hasOption("list")) {
cmdType = 1;
} else if (cmd.hasOption("desc")) {
cmdType = 2;
} else if (cmd.hasOption("cluster")) {
cmdType = 3;
+ } else if (cmd.hasOption("kill")) {
+ cmdType = 4;
+ queryId = cmd.getOptionValue("kill");
}
// if there is no "-h" option,
@@ -149,6 +156,9 @@ public class TajoAdmin {
case 3:
processCluster(writer, client);
break;
+ case 4:
+ processKill(writer, client, queryId);
+ break;
default:
printUsage();
break;
@@ -375,7 +385,7 @@ public class TajoAdmin {
writer.write(line);
for (BriefQueryInfo queryInfo : queryList) {
- String queryId = String.format("q-%s-%04d",
+ String queryId = String.format("q_%s_%04d",
queryInfo.getQueryId().getId(),
queryInfo.getQueryId().getSeq());
String state = getQueryState(queryInfo.getState());
@@ -386,4 +396,14 @@ public class TajoAdmin {
writer.write(line);
}
}
+
+ public static void processKill(Writer writer, TajoClient client, String queryIdStr)
+ throws IOException, ServiceException {
+ boolean killedSuccessfully = client.killQuery(TajoIdUtils.parseQueryId(queryIdStr));
+ if (killedSuccessfully) {
+ writer.write(queryIdStr + " is killed successfully.\n");
+ } else {
+ writer.write("killing query is failed.");
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
index 3aeb40e..d9c511e 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
@@ -476,25 +476,22 @@ public class TajoClient {
long currentTimeMillis = System.currentTimeMillis();
long timeKillIssued = currentTimeMillis;
- while ((currentTimeMillis < timeKillIssued + 10000L) && (status.getState()
- != QueryState.QUERY_KILLED)) {
+ while ((currentTimeMillis < timeKillIssued + 10000L) && (status.getState() != QueryState.QUERY_KILLED)) {
try {
- Thread.sleep(1000L);
+ Thread.sleep(100L);
} catch(InterruptedException ie) {
- /** interrupted, just break */
break;
}
currentTimeMillis = System.currentTimeMillis();
status = getQueryStatus(queryId);
}
+ return status.getState() == QueryState.QUERY_KILLED;
} catch(Exception e) {
LOG.debug("Error when checking for application status", e);
return false;
} finally {
connPool.releaseConnection(tmClient);
}
-
- return true;
}
public List<CatalogProtos.FunctionDescProto> getFunctions(final String functionName) throws ServiceException {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-common/src/main/proto/tajo_protos.proto
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/proto/tajo_protos.proto b/tajo-common/src/main/proto/tajo_protos.proto
index d337315..0abc266 100644
--- a/tajo-common/src/main/proto/tajo_protos.proto
+++ b/tajo-common/src/main/proto/tajo_protos.proto
@@ -30,8 +30,9 @@ enum QueryState {
QUERY_SUCCEEDED = 5;
QUERY_FAILED = 6;
QUERY_KILLED = 7;
- QUERY_ERROR = 8;
- QUERY_NOT_ASSIGNED = 9;
+ QUERY_KILL_WAIT = 8;
+ QUERY_ERROR = 9;
+ QUERY_NOT_ASSIGNED = 10;
}
enum TaskAttemptState {
@@ -42,5 +43,6 @@ enum TaskAttemptState {
TA_RUNNING = 4;
TA_SUCCEEDED = 5;
TA_FAILED = 6;
- TA_KILLED = 7;
+ TA_KILL_WAIT = 7;
+ TA_KILLED = 8;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/Sleep.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/Sleep.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/Sleep.java
new file mode 100644
index 0000000..0ae8386
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/Sleep.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.function.builtin;
+
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.engine.function.GeneralFunction;
+import org.apache.tajo.engine.function.annotation.Description;
+import org.apache.tajo.engine.function.annotation.ParamTypes;
+import org.apache.tajo.storage.Tuple;
+
+@Description(
+ functionName = "sleep",
+ description = "sleep for seconds",
+ example = "> SELECT sleep(1) from table1;",
+ returnType = TajoDataTypes.Type.INT4,
+ paramTypes = {@ParamTypes(paramTypes = {TajoDataTypes.Type.INT4})}
+)
+public class Sleep extends GeneralFunction {
+
+ public Sleep() {
+ super(NoArgs);
+ }
+
+ @Override
+ public Datum eval(Tuple params) {
+ try {
+ Thread.sleep(params.getInt4(0) * 1000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ return DatumFactory.createInt4(params.getInt4(0));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java
index 99d673b..8c55d7f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java
@@ -135,9 +135,11 @@ public class SortBasedColPartitionStoreExec extends ColPartitionStoreExec {
@Override
public void close() throws IOException {
- appender.close();
- StatisticsUtil.aggregateTableStat(aggregated, appender.getStats());
- context.setResultStats(aggregated);
+ if (appender != null) {
+ appender.close();
+ StatisticsUtil.aggregateTableStat(aggregated, appender.getStats());
+ context.setResultStats(aggregated);
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/AbstractTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/AbstractTaskScheduler.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/AbstractTaskScheduler.java
index acb1dcc..6c187b6 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/AbstractTaskScheduler.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/AbstractTaskScheduler.java
@@ -24,8 +24,7 @@ import org.apache.tajo.master.event.TaskRequestEvent;
import org.apache.tajo.master.event.TaskSchedulerEvent;
-public abstract class AbstractTaskScheduler extends AbstractService
- implements EventHandler<TaskSchedulerEvent> {
+public abstract class AbstractTaskScheduler extends AbstractService implements EventHandler<TaskSchedulerEvent> {
/**
* Construct the service.
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
index 4260c98..cd18e10 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
@@ -222,6 +222,14 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
scheduledRequests.addNonLeafTask(castEvent);
}
}
+ } else if (event.getType() == EventType.T_SCHEDULE_CANCEL) {
+ // when a subquery is killed, unassigned query unit attmpts are canceled from the scheduler.
+ // This event is triggered by QueryUnitAttempt.
+ QueryUnitAttemptScheduleEvent castedEvent = (QueryUnitAttemptScheduleEvent) event;
+ scheduledRequests.leafTasks.remove(castedEvent.getQueryUnitAttempt().getId());
+ LOG.info(castedEvent.getQueryUnitAttempt().getId() + " is canceled from " + this.getClass().getSimpleName());
+ ((QueryUnitAttemptScheduleEvent) event).getQueryUnitAttempt().handle(
+ new TaskAttemptEvent(castedEvent.getQueryUnitAttempt().getId(), TaskAttemptEventType.TA_SCHEDULE_CANCELED));
}
}
@@ -360,6 +368,9 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
}
private class ScheduledRequests {
+ // two list leafTasks and nonLeafTasks keep all tasks to be scheduled. Even though some task is included in
+ // leafTaskHostMapping or leafTasksRackMapping, some task T will not be sent to a task runner
+ // if the task is not included in leafTasks and nonLeafTasks.
private final Set<QueryUnitAttemptId> leafTasks = Collections.synchronizedSet(new HashSet<QueryUnitAttemptId>());
private final Set<QueryUnitAttemptId> nonLeafTasks = Collections.synchronizedSet(new HashSet<QueryUnitAttemptId>());
private Map<String, TaskBlockLocation> leafTaskHostMapping = new HashMap<String, TaskBlockLocation>();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
index f405ea7..e44947e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.proto.YarnProtos;
import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.QueryUnitAttemptId;
import org.apache.tajo.ipc.TajoMasterProtocol;
import org.apache.tajo.ipc.TajoWorkerProtocol;
import org.apache.tajo.master.querymaster.QueryMasterTask;
@@ -60,6 +61,25 @@ public class TajoContainerProxy extends ContainerProxy {
assignExecutionBlock(executionBlockId, container);
}
+ /**
+ * It sends a kill RPC request to a corresponding worker.
+ *
+ * @param taskAttemptId The TaskAttemptId to be killed.
+ */
+ public void killTaskAttempt(QueryUnitAttemptId taskAttemptId) {
+ NettyClientBase tajoWorkerRpc = null;
+ try {
+ InetSocketAddress addr = new InetSocketAddress(container.getNodeId().getHost(), container.getNodeId().getPort());
+ tajoWorkerRpc = RpcConnectionPool.getPool(context.getConf()).getConnection(addr, TajoWorkerProtocol.class, true);
+ TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerRpcClient = tajoWorkerRpc.getStub();
+ tajoWorkerRpcClient.killTaskAttempt(null, taskAttemptId.getProto(), NullCallback.get());
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ } finally {
+ RpcConnectionPool.getPool(context.getConf()).releaseConnection(tajoWorkerRpc);
+ }
+ }
+
private void assignExecutionBlock(ExecutionBlockId executionBlockId, Container container) {
NettyClientBase tajoWorkerRpc = null;
try {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
index 0f8eb61..856566a 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
@@ -43,6 +43,7 @@ import org.apache.tajo.ipc.TajoMasterClientProtocol.TajoMasterClientProtocolServ
import org.apache.tajo.master.TajoMaster.MasterContext;
import org.apache.tajo.master.querymaster.QueryInProgress;
import org.apache.tajo.master.querymaster.QueryInfo;
+import org.apache.tajo.master.querymaster.QueryJobEvent;
import org.apache.tajo.master.querymaster.QueryJobManager;
import org.apache.tajo.master.rm.WorkerResource;
import org.apache.tajo.rpc.BlockingRpcServer;
@@ -296,15 +297,15 @@ public class TajoMasterClientService extends AbstractService {
return builder.build();
}
+ /**
+ * It is invoked by TajoContainerProxy.
+ */
@Override
- public BoolProto killQuery(RpcController controller,
- TajoIdProtos.QueryIdProto request)
- throws ServiceException {
+ public BoolProto killQuery(RpcController controller, TajoIdProtos.QueryIdProto request) throws ServiceException {
QueryId queryId = new QueryId(request);
QueryJobManager queryJobManager = context.getQueryJobManager();
- //TODO KHJ, change QueryJobManager to event handler
- //queryJobManager.handle(new QueryEvent(queryId, QueryEventType.KILL));
-
+ queryJobManager.getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_KILL,
+ new QueryInfo(queryId)));
return BOOL_TRUE;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/LocalTaskEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/LocalTaskEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/LocalTaskEvent.java
new file mode 100644
index 0000000..92e6695
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/LocalTaskEvent.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.event;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.tajo.QueryUnitAttemptId;
+
+/**
+ * This event is sent to a running TaskAttempt on a worker.
+ */
+public class LocalTaskEvent extends AbstractEvent<LocalTaskEventType> {
+ private final QueryUnitAttemptId taskAttemptId;
+ private final ContainerId containerId;
+
+ public LocalTaskEvent(QueryUnitAttemptId taskAttemptId, ContainerId containerId, LocalTaskEventType eventType) {
+ super(eventType);
+ this.taskAttemptId = taskAttemptId;
+ this.containerId = containerId;
+ }
+
+ public QueryUnitAttemptId getTaskAttemptId() {
+ return taskAttemptId;
+ }
+
+ public ContainerId getContainerId() {
+ return containerId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/LocalTaskEventType.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/LocalTaskEventType.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/LocalTaskEventType.java
new file mode 100644
index 0000000..00b548e
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/LocalTaskEventType.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.event;
+
+public enum LocalTaskEventType {
+ KILL
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryCompletedEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryCompletedEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryCompletedEvent.java
new file mode 100644
index 0000000..dc75a1d
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryCompletedEvent.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.event;
+
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.master.querymaster.SubQueryState;
+
+public class QueryCompletedEvent extends QueryEvent {
+ private final ExecutionBlockId executionBlockId;
+ private final SubQueryState finalState;
+
+ public QueryCompletedEvent(final ExecutionBlockId executionBlockId,
+ SubQueryState finalState) {
+ super(executionBlockId.getQueryId(), QueryEventType.QUERY_COMPLETED);
+ this.executionBlockId = executionBlockId;
+ this.finalState = finalState;
+ }
+
+ public ExecutionBlockId getExecutionBlockId() {
+ return executionBlockId;
+ }
+
+ public SubQueryState getState() {
+ return finalState;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryEventType.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryEventType.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryEventType.java
index d5f7e38..edc0cd8 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryEventType.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryEventType.java
@@ -19,9 +19,18 @@
package org.apache.tajo.master.event;
public enum QueryEventType {
+
+ // Producer: TajoMaster
START,
- INTERNAL_ERROR,
- SUBQUERY_COMPLETED,
KILL,
+
+ // Producer: SubQuery
+ SUBQUERY_COMPLETED,
+
+ // Producer: Query
+ QUERY_COMPLETED,
+
+ // Producer: Any component
DIAGNOSTIC_UPDATE,
+ INTERNAL_ERROR,
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryFinishEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryFinishEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryFinishEvent.java
deleted file mode 100644
index 9c81132..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryFinishEvent.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.event;
-
-import org.apache.hadoop.yarn.event.AbstractEvent;
-import org.apache.tajo.QueryId;
-
-public class QueryFinishEvent extends AbstractEvent {
- public enum EventType {
- QUERY_FINISH
- }
-
- private final QueryId queryId;
-
- public QueryFinishEvent(QueryId queryId) {
- super(EventType.QUERY_FINISH);
- this.queryId = queryId;
- }
-
- public QueryId getQueryId() {
- return this.queryId;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryMasterQueryCompletedEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryMasterQueryCompletedEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryMasterQueryCompletedEvent.java
new file mode 100644
index 0000000..bc7e0f4
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryMasterQueryCompletedEvent.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.event;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.tajo.QueryId;
+
+public class QueryMasterQueryCompletedEvent extends AbstractEvent<QueryMasterQueryCompletedEvent.EventType> {
+ public enum EventType {
+ QUERY_FINISH
+ }
+
+ private final QueryId queryId;
+
+ public QueryMasterQueryCompletedEvent(QueryId queryId) {
+ super(EventType.QUERY_FINISH);
+ this.queryId = queryId;
+ }
+
+ public QueryId getQueryId() {
+ return this.queryId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryCompletedEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryCompletedEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryCompletedEvent.java
index 7e07525..6389798 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryCompletedEvent.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryCompletedEvent.java
@@ -36,7 +36,7 @@ public class SubQueryCompletedEvent extends QueryEvent {
return executionBlockId;
}
- public SubQueryState getFinalState() {
+ public SubQueryState getState() {
return finalState;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryEventType.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryEventType.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryEventType.java
index 2e56c79..8003ef3 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryEventType.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryEventType.java
@@ -27,6 +27,8 @@ public enum SubQueryEventType {
SQ_INIT,
SQ_START,
SQ_CONTAINER_ALLOCATED,
+ SQ_KILL,
+ SQ_LAUNCH,
// Producer: QueryUnit
SQ_TASK_COMPLETED,
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQuerySucceeEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQuerySucceeEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQuerySucceeEvent.java
deleted file mode 100644
index 2485421..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQuerySucceeEvent.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.event;
-
-import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.master.querymaster.SubQueryState;
-
-public class SubQuerySucceeEvent extends SubQueryCompletedEvent {
- public SubQuerySucceeEvent(final ExecutionBlockId id) {
- super(id, SubQueryState.SUCCEEDED);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryTaskEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryTaskEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryTaskEvent.java
index 0217f20..0502534 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryTaskEvent.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryTaskEvent.java
@@ -19,19 +19,25 @@
package org.apache.tajo.master.event;
import org.apache.tajo.QueryUnitId;
+import org.apache.tajo.master.TaskState;
/**
* Event Class: From Task to SubQuery
*/
public class SubQueryTaskEvent extends SubQueryEvent {
private QueryUnitId taskId;
- public SubQueryTaskEvent(QueryUnitId taskId,
- SubQueryEventType subQueryEventType) {
- super(taskId.getExecutionBlockId(), subQueryEventType);
+ private TaskState state;
+ public SubQueryTaskEvent(QueryUnitId taskId, TaskState state) {
+ super(taskId.getExecutionBlockId(), SubQueryEventType.SQ_TASK_COMPLETED);
this.taskId = taskId;
+ this.state = state;
}
public QueryUnitId getTaskId() {
return this.taskId;
}
+
+ public TaskState getState() {
+ return state;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskAttemptEventType.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskAttemptEventType.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskAttemptEventType.java
index d9d2f13..e35b154 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskAttemptEventType.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskAttemptEventType.java
@@ -29,9 +29,11 @@ public enum TaskAttemptEventType {
//Producer:Client, Task
TA_KILL,
+ TA_LOCAL_KILLED,
//Producer:Scheduler
TA_ASSIGNED,
+ TA_SCHEDULE_CANCELED,
//Producer:Scheduler
TA_LAUNCHED,
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskSchedulerEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskSchedulerEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskSchedulerEvent.java
index 9fe2f8c..383845f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskSchedulerEvent.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskSchedulerEvent.java
@@ -25,7 +25,7 @@ import org.apache.tajo.master.event.TaskSchedulerEvent.EventType;
public abstract class TaskSchedulerEvent extends AbstractEvent<EventType> {
public enum EventType {
T_SCHEDULE,
- T_SUBQUERY_COMPLETED
+ T_SCHEDULE_CANCEL
}
protected final ExecutionBlockId executionBlockId;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
index f4a6da7..6a4eb4b 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
@@ -76,6 +76,10 @@ public class Query implements EventHandler<QueryEvent> {
private long finishTime;
private TableDesc resultDesc;
private int completedSubQueryCount = 0;
+ private int successedSubQueryCount = 0;
+ private int killedSubQueryCount = 0;
+ private int failedSubQueryCount = 0;
+ private int erroredSubQueryCount = 0;
private final List<String> diagnostics = new ArrayList<String>();
// Internal Variables
@@ -89,6 +93,8 @@ public class Query implements EventHandler<QueryEvent> {
// Transition Handler
private static final SingleArcTransition INTERNAL_ERROR_TRANSITION = new InternalErrorTransition();
private static final DiagnosticsUpdateTransition DIAGNOSTIC_UPDATE_TRANSITION = new DiagnosticsUpdateTransition();
+ private static final SubQueryCompletedTransition SUBQUERY_COMPLETED_TRANSITION = new SubQueryCompletedTransition();
+ private static final QueryCompletedTransition QUERY_COMPLETED_TRANSITION = new QueryCompletedTransition();
protected static final StateMachineFactory
<Query,QueryState,QueryEventType,QueryEvent> stateMachineFactory =
@@ -102,23 +108,51 @@ public class Query implements EventHandler<QueryEvent> {
.addTransition(QueryState.QUERY_NEW, QueryState.QUERY_NEW,
QueryEventType.DIAGNOSTIC_UPDATE,
DIAGNOSTIC_UPDATE_TRANSITION)
+ .addTransition(QueryState.QUERY_NEW, QueryState.QUERY_KILLED,
+ QueryEventType.KILL,
+ new KillNewQueryTransition())
.addTransition(QueryState.QUERY_NEW, QueryState.QUERY_ERROR,
QueryEventType.INTERNAL_ERROR,
INTERNAL_ERROR_TRANSITION)
// Transitions from RUNNING state
+ .addTransition(QueryState.QUERY_RUNNING, QueryState.QUERY_RUNNING,
+ QueryEventType.SUBQUERY_COMPLETED,
+ SUBQUERY_COMPLETED_TRANSITION)
.addTransition(QueryState.QUERY_RUNNING,
- EnumSet.of(QueryState.QUERY_RUNNING, QueryState.QUERY_SUCCEEDED, QueryState.QUERY_FAILED,
+ EnumSet.of(QueryState.QUERY_SUCCEEDED, QueryState.QUERY_FAILED, QueryState.QUERY_KILLED,
QueryState.QUERY_ERROR),
- QueryEventType.SUBQUERY_COMPLETED,
- new SubQueryCompletedTransition())
+ QueryEventType.QUERY_COMPLETED,
+ QUERY_COMPLETED_TRANSITION)
.addTransition(QueryState.QUERY_RUNNING, QueryState.QUERY_RUNNING,
QueryEventType.DIAGNOSTIC_UPDATE,
DIAGNOSTIC_UPDATE_TRANSITION)
+ .addTransition(QueryState.QUERY_RUNNING, QueryState.QUERY_KILL_WAIT,
+ QueryEventType.KILL,
+ new KillSubQueriesTransition())
.addTransition(QueryState.QUERY_RUNNING, QueryState.QUERY_ERROR,
QueryEventType.INTERNAL_ERROR,
INTERNAL_ERROR_TRANSITION)
+ // Transitions from KILL_WAIT state
+ .addTransition(QueryState.QUERY_KILL_WAIT, QueryState.QUERY_KILL_WAIT,
+ QueryEventType.SUBQUERY_COMPLETED,
+ SUBQUERY_COMPLETED_TRANSITION)
+ .addTransition(QueryState.QUERY_KILL_WAIT,
+ EnumSet.of(QueryState.QUERY_SUCCEEDED, QueryState.QUERY_FAILED, QueryState.QUERY_KILLED,
+ QueryState.QUERY_ERROR),
+ QueryEventType.QUERY_COMPLETED,
+ QUERY_COMPLETED_TRANSITION)
+ .addTransition(QueryState.QUERY_KILL_WAIT, QueryState.QUERY_KILL_WAIT,
+ QueryEventType.DIAGNOSTIC_UPDATE,
+ DIAGNOSTIC_UPDATE_TRANSITION)
+ .addTransition(QueryState.QUERY_KILL_WAIT, QueryState.QUERY_ERROR,
+ QueryEventType.INTERNAL_ERROR,
+ INTERNAL_ERROR_TRANSITION)
+ // Ignore-able transitions
+ .addTransition(QueryState.QUERY_KILL_WAIT, QueryState.QUERY_KILL_WAIT,
+ EnumSet.of(QueryEventType.KILL))
+
// Transitions from FAILED state
.addTransition(QueryState.QUERY_FAILED, QueryState.QUERY_FAILED,
QueryEventType.DIAGNOSTIC_UPDATE,
@@ -126,6 +160,9 @@ public class Query implements EventHandler<QueryEvent> {
.addTransition(QueryState.QUERY_FAILED, QueryState.QUERY_ERROR,
QueryEventType.INTERNAL_ERROR,
INTERNAL_ERROR_TRANSITION)
+ // Ignore-able transitions
+ .addTransition(QueryState.QUERY_FAILED, QueryState.QUERY_FAILED,
+ QueryEventType.KILL)
// Transitions from ERROR state
.addTransition(QueryState.QUERY_ERROR, QueryState.QUERY_ERROR,
@@ -134,6 +171,9 @@ public class Query implements EventHandler<QueryEvent> {
.addTransition(QueryState.QUERY_ERROR, QueryState.QUERY_ERROR,
QueryEventType.INTERNAL_ERROR,
INTERNAL_ERROR_TRANSITION)
+ // Ignore-able transitions
+ .addTransition(QueryState.QUERY_ERROR, QueryState.QUERY_ERROR,
+ QueryEventType.KILL)
.installTopology();
@@ -294,78 +334,40 @@ public class Query implements EventHandler<QueryEvent> {
}
}
- public static class SubQueryCompletedTransition implements MultipleArcTransition<Query, QueryEvent, QueryState> {
+ public static class QueryCompletedTransition implements MultipleArcTransition<Query, QueryEvent, QueryState> {
- private boolean hasNext(Query query) {
- ExecutionBlockCursor cursor = query.getExecutionBlockCursor();
- ExecutionBlock nextBlock = cursor.peek();
- return !query.getPlan().isTerminal(nextBlock);
- }
-
- private QueryState executeNextBlock(Query query) {
- ExecutionBlockCursor cursor = query.getExecutionBlockCursor();
- ExecutionBlock nextBlock = cursor.nextBlock();
- SubQuery nextSubQuery = new SubQuery(query.context, query.getPlan(), nextBlock, query.sm);
- nextSubQuery.setPriority(query.priority--);
- query.addSubQuery(nextSubQuery);
- nextSubQuery.handle(new SubQueryEvent(nextSubQuery.getId(), SubQueryEventType.SQ_INIT));
-
- LOG.info("Scheduling SubQuery:" + nextSubQuery.getId());
- if(LOG.isDebugEnabled()) {
- LOG.debug("Scheduling SubQuery's Priority: " + nextSubQuery.getPriority());
- LOG.debug("Scheduling SubQuery's Plan: \n" + nextSubQuery.getBlock().getPlan());
+ @Override
+ public QueryState transition(Query query, QueryEvent queryEvent) {
+ QueryCompletedEvent subQueryEvent = (QueryCompletedEvent) queryEvent;
+ QueryState finalState;
+ if (subQueryEvent.getState() == SubQueryState.SUCCEEDED) {
+ finalizeQuery(query, subQueryEvent);
+ finalState = QueryState.QUERY_SUCCEEDED;
+ } else if (subQueryEvent.getState() == SubQueryState.FAILED) {
+ finalState = QueryState.QUERY_FAILED;
+ } else if (subQueryEvent.getState() == SubQueryState.KILLED) {
+ finalState = QueryState.QUERY_KILLED;
+ } else {
+ finalState = QueryState.QUERY_ERROR;
}
-
- return query.checkQueryForCompleted();
+ query.eventHandler.handle(new QueryMasterQueryCompletedEvent(query.getId()));
+ query.setFinishTime();
+ return finalState;
}
- private QueryState finalizeQuery(Query query, SubQueryCompletedEvent event) {
+ private void finalizeQuery(Query query, QueryCompletedEvent event) {
MasterPlan masterPlan = query.getPlan();
- if (query.checkQueryForCompleted() == QueryState.QUERY_SUCCEEDED) {
- ExecutionBlock terminal = query.getPlan().getTerminalBlock();
- DataChannel finalChannel = masterPlan.getChannel(event.getExecutionBlockId(), terminal.getId());
- Path finalOutputDir = commitOutputData(query);
+ ExecutionBlock terminal = query.getPlan().getTerminalBlock();
+ DataChannel finalChannel = masterPlan.getChannel(event.getExecutionBlockId(), terminal.getId());
+ Path finalOutputDir = commitOutputData(query);
- QueryHookExecutor hookExecutor = new QueryHookExecutor(query.context.getQueryMasterContext());
- try {
- hookExecutor.execute(query.context.getQueryContext(), query, event.getExecutionBlockId(),
- finalOutputDir);
- } catch (Exception e) {
- query.eventHandler.handle(new QueryDiagnosticsUpdateEvent(query.id, ExceptionUtils.getStackTrace(e)));
- return QueryState.QUERY_FAILED;
- } finally {
- query.setFinishTime();
- }
- query.finished(QueryState.QUERY_SUCCEEDED);
- query.eventHandler.handle(new QueryFinishEvent(query.getId()));
- }
-
- return QueryState.QUERY_SUCCEEDED;
- }
-
- @Override
- public QueryState transition(Query query, QueryEvent event) {
- // increase the count for completed subqueries
- query.completedSubQueryCount++;
-
- SubQueryCompletedEvent castEvent = (SubQueryCompletedEvent) event;
-
- // if the subquery is succeeded
- if (castEvent.getFinalState() == SubQueryState.SUCCEEDED) {
- if (hasNext(query)) { // if there is next block
- return executeNextBlock(query);
- } else {
- return finalizeQuery(query, castEvent);
- }
- } else {
- query.setFinishTime();
-
- if (castEvent.getFinalState() == SubQueryState.ERROR) {
- return QueryState.QUERY_ERROR;
- } else {
- return QueryState.QUERY_FAILED;
- }
+ QueryHookExecutor hookExecutor = new QueryHookExecutor(query.context.getQueryMasterContext());
+ try {
+ hookExecutor.execute(query.context.getQueryContext(), query, event.getExecutionBlockId(),
+ finalOutputDir);
+ } catch (Exception e) {
+ query.eventHandler.handle(new QueryDiagnosticsUpdateEvent(query.id, ExceptionUtils.getStackTrace(e)));
}
}
@@ -541,6 +543,64 @@ public class Query implements EventHandler<QueryEvent> {
}
}
+ public static class SubQueryCompletedTransition implements SingleArcTransition<Query, QueryEvent> {
+
+ private boolean hasNext(Query query) {
+ ExecutionBlockCursor cursor = query.getExecutionBlockCursor();
+ ExecutionBlock nextBlock = cursor.peek();
+ return !query.getPlan().isTerminal(nextBlock);
+ }
+
+ private void executeNextBlock(Query query) {
+ ExecutionBlockCursor cursor = query.getExecutionBlockCursor();
+ ExecutionBlock nextBlock = cursor.nextBlock();
+ SubQuery nextSubQuery = new SubQuery(query.context, query.getPlan(), nextBlock, query.sm);
+ nextSubQuery.setPriority(query.priority--);
+ query.addSubQuery(nextSubQuery);
+ nextSubQuery.handle(new SubQueryEvent(nextSubQuery.getId(), SubQueryEventType.SQ_INIT));
+
+ LOG.info("Scheduling SubQuery:" + nextSubQuery.getId());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Scheduling SubQuery's Priority: " + nextSubQuery.getPriority());
+ LOG.debug("Scheduling SubQuery's Plan: \n" + nextSubQuery.getBlock().getPlan());
+ }
+ }
+
+ @Override
+ public void transition(Query query, QueryEvent event) {
+ try {
+ query.completedSubQueryCount++;
+ SubQueryCompletedEvent castEvent = (SubQueryCompletedEvent) event;
+
+ if (castEvent.getState() == SubQueryState.SUCCEEDED) {
+ query.successedSubQueryCount++;
+ } else if (castEvent.getState() == SubQueryState.KILLED) {
+ query.killedSubQueryCount++;
+ } else if (castEvent.getState() == SubQueryState.FAILED) {
+ query.failedSubQueryCount++;
+ } else if (castEvent.getState() == SubQueryState.ERROR) {
+ query.erroredSubQueryCount++;
+ } else {
+ LOG.error(String.format("Invalid SubQuery (%s) State %s at %s",
+ castEvent.getExecutionBlockId().toString(), castEvent.getState().name(), query.getState().name()));
+ query.eventHandler.handle(new QueryEvent(event.getQueryId(), QueryEventType.INTERNAL_ERROR));
+ }
+
+ // if a subquery is succeeded and a query is running
+ if (castEvent.getState() == SubQueryState.SUCCEEDED && // latest subquery succeeded
+ query.getState() == QueryState.QUERY_RUNNING && // current state is not in KILL_WAIT, FAILED, or ERROR.
+ hasNext(query)) { // there remains at least one subquery.
+ executeNextBlock(query);
+ } else { // if a query is completed due to finished, kill, failure, or error
+ query.eventHandler.handle(new QueryCompletedEvent(castEvent.getExecutionBlockId(), castEvent.getState()));
+ }
+ } catch (Throwable t) {
+ LOG.error(t);
+ query.eventHandler.handle(new QueryEvent(event.getQueryId(), QueryEventType.INTERNAL_ERROR));
+ }
+ }
+ }
+
private static class DiagnosticsUpdateTransition implements SingleArcTransition<Query, QueryEvent> {
@Override
public void transition(Query query, QueryEvent event) {
@@ -548,32 +608,34 @@ public class Query implements EventHandler<QueryEvent> {
}
}
- private static class InternalErrorTransition implements SingleArcTransition<Query, QueryEvent> {
-
+ private static class KillNewQueryTransition implements SingleArcTransition<Query, QueryEvent> {
@Override
public void transition(Query query, QueryEvent event) {
query.setFinishTime();
- query.finished(QueryState.QUERY_ERROR);
+ query.eventHandler.handle(new QueryMasterQueryCompletedEvent(query.getId()));
}
}
- public QueryState finished(QueryState finalState) {
- setFinishTime();
- return finalState;
+ private static class KillSubQueriesTransition implements SingleArcTransition<Query, QueryEvent> {
+ @Override
+ public void transition(Query query, QueryEvent event) {
+ synchronized (query.subqueries) {
+ for (SubQuery subquery : query.subqueries.values()) {
+ query.eventHandler.handle(new SubQueryEvent(subquery.getId(), SubQueryEventType.SQ_KILL));
+ }
+ }
+ }
}
- /**
- * Check if all subqueries of the query are completed
- * @return QueryState.QUERY_SUCCEEDED if all subqueries are completed.
- */
- QueryState checkQueryForCompleted() {
- if (completedSubQueryCount == subqueries.size()) {
- return QueryState.QUERY_SUCCEEDED;
+ private static class InternalErrorTransition implements SingleArcTransition<Query, QueryEvent> {
+
+ @Override
+ public void transition(Query query, QueryEvent event) {
+ query.setFinishTime();
+ query.eventHandler.handle(new QueryMasterQueryCompletedEvent(query.getId()));
}
- return getState();
}
-
@Override
public void handle(QueryEvent event) {
LOG.info("Processing " + event.getQueryId() + " of type " + event.getType());
@@ -589,8 +651,7 @@ public class Query implements EventHandler<QueryEvent> {
//notify the eventhandler of state change
if (oldState != getState()) {
- LOG.info(id + " Query Transitioned from " + oldState + " to "
- + getState());
+ LOG.info(id + " Query Transitioned from " + oldState + " to " + getState());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
index 2a93d3c..6dc437f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
@@ -91,6 +91,10 @@ public class QueryInProgress extends CompositeService {
super.init(conf);
}
+ public void kill() {
+ queryMasterRpcClient.killQuery(null, queryId.getProto(), NullCallback.get());
+ }
+
@Override
public void stop() {
if(stopped.getAndSet(true)) {
@@ -172,6 +176,8 @@ public class QueryInProgress extends CompositeService {
submmitQueryToMaster();
} else if(queryJobEvent.getType() == QueryJobEvent.Type.QUERY_JOB_FINISH) {
stop();
+ } else if (queryJobEvent.getType() == QueryJobEvent.Type.QUERY_JOB_KILL) {
+ kill();
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobEvent.java
index b2c129f..811de1b 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobEvent.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobEvent.java
@@ -38,6 +38,7 @@ public class QueryJobEvent extends AbstractEvent<QueryJobEvent.Type> {
QUERY_JOB_HEARTBEAT,
QUERY_JOB_FINISH,
QUERY_MASTER_START,
- QUERY_MASTER_STOP
+ QUERY_MASTER_STOP,
+ QUERY_JOB_KILL
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
index 5ce57f7..3c30e38 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
@@ -26,10 +26,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.QueryId;
-import org.apache.tajo.QueryUnitAttemptId;
-import org.apache.tajo.TajoIdProtos;
+import org.apache.tajo.*;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.ipc.QueryMasterProtocol;
@@ -128,13 +125,12 @@ public class QueryMasterManagerService extends CompositeService
try {
ExecutionBlockId ebId = new ExecutionBlockId(request.getExecutionBlockId());
QueryMasterTask queryMasterTask = workerContext.getQueryMaster().getQueryMasterTask(ebId.getQueryId());
- ContainerId cid =
- queryMasterTask.getQueryTaskContext().getResourceAllocator().makeContainerId(request.getContainerId());
if(queryMasterTask.isStopped()) {
- LOG.debug("getTask:" + cid + ", ebId:" + ebId + ", but query is finished.");
done.run(LazyTaskScheduler.stopTaskRunnerReq);
} else {
+ ContainerId cid =
+ queryMasterTask.getQueryTaskContext().getResourceAllocator().makeContainerId(request.getContainerId());
LOG.debug("getTask:" + cid + ", ebId:" + ebId);
queryMasterTask.handleTaskRequestEvent(new TaskRequestEvent(cid, ebId, done));
}
@@ -147,10 +143,26 @@ public class QueryMasterManagerService extends CompositeService
public void statusUpdate(RpcController controller, TajoWorkerProtocol.TaskStatusProto request,
RpcCallback<PrimitiveProtos.BoolProto> done) {
try {
- QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(
- new QueryId(request.getId().getQueryUnitId().getExecutionBlockId().getQueryId()));
- queryMasterTask.getEventHandler().handle(
- new TaskAttemptStatusUpdateEvent(new QueryUnitAttemptId(request.getId()), request));
+ QueryId queryId = new QueryId(request.getId().getQueryUnitId().getExecutionBlockId().getQueryId());
+ QueryUnitAttemptId attemptId = new QueryUnitAttemptId(request.getId());
+ LOG.info("statusUpdate from " + attemptId);
+ QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(queryId);
+ if (queryMasterTask == null) {
+ queryMasterTask = queryMaster.getQueryMasterTask(queryId, true);
+ }
+ SubQuery sq = queryMasterTask.getQuery().getSubQuery(attemptId.getQueryUnitId().getExecutionBlockId());
+ QueryUnit task = sq.getQueryUnit(attemptId.getQueryUnitId());
+ QueryUnitAttempt attempt = task.getAttempt(attemptId.getId());
+ LOG.info(String.format("Task State: %s, Attempt State: %s", task.getState().name(), attempt.getState().name()));
+ if (request.getState() == TajoProtos.TaskAttemptState.TA_KILLED) {
+ LOG.info(attemptId + " Killed");
+ attempt.handle(
+ new TaskAttemptEvent(new QueryUnitAttemptId(request.getId()), TaskAttemptEventType.TA_LOCAL_KILLED));
+ } else {
+ LOG.info(attemptId + " updated");
+ queryMasterTask.getEventHandler().handle(
+ new TaskAttemptStatusUpdateEvent(new QueryUnitAttemptId(request.getId()), request));
+ }
done.run(TajoWorker.TRUE_PROTO);
} catch (Exception e) {
LOG.error(e.getMessage(), e);
@@ -194,6 +206,14 @@ public class QueryMasterManagerService extends CompositeService
}
@Override
+ public void killQuery(RpcController controller, TajoIdProtos.QueryIdProto request,
+ RpcCallback<PrimitiveProtos.BoolProto> done) {
+ QueryId queryId = new QueryId(request);
+ QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(queryId);
+ queryMasterTask.getQuery().handle(new QueryEvent(queryId, QueryEventType.KILL));
+ }
+
+ @Override
public void executeQuery(RpcController controller,
TajoWorkerProtocol.QueryExecutionRequestProto request,
RpcCallback<PrimitiveProtos.BoolProto> done) {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
index eb0528c..2c3ddfe 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
@@ -45,6 +45,7 @@ import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.ipc.TajoMasterProtocol;
import org.apache.tajo.master.GlobalEngine;
import org.apache.tajo.master.TajoAsyncDispatcher;
+import org.apache.tajo.master.TajoContainerProxy;
import org.apache.tajo.master.event.*;
import org.apache.tajo.master.rm.TajoWorkerResourceManager;
import org.apache.tajo.rpc.CallFuture;
@@ -64,6 +65,8 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import static org.apache.tajo.TajoProtos.QueryState;
+
public class QueryMasterTask extends CompositeService {
private static final Log LOG = LogFactory.getLog(QueryMasterTask.class.getName());
@@ -135,8 +138,9 @@ public class QueryMasterTask extends CompositeService {
dispatcher.register(SubQueryEventType.class, new SubQueryEventDispatcher());
dispatcher.register(TaskEventType.class, new TaskEventDispatcher());
dispatcher.register(TaskAttemptEventType.class, new TaskAttemptEventDispatcher());
- dispatcher.register(QueryFinishEvent.EventType.class, new QueryFinishEventHandler());
+ dispatcher.register(QueryMasterQueryCompletedEvent.EventType.class, new QueryFinishEventHandler());
dispatcher.register(TaskSchedulerEvent.EventType.class, new TaskSchedulerDispatcher());
+ dispatcher.register(LocalTaskEventType.class, new LocalTaskEventHandler());
initStagingDir();
@@ -247,12 +251,38 @@ public class QueryMasterTask extends CompositeService {
}
}
- private static class QueryFinishEventHandler implements EventHandler<QueryFinishEvent> {
+ private class LocalTaskEventHandler implements EventHandler<LocalTaskEvent> {
+ @Override
+ public void handle(LocalTaskEvent event) {
+ TajoContainerProxy proxy = (TajoContainerProxy) resourceAllocator.getContainers().get(event.getContainerId());
+ proxy.killTaskAttempt(event.getTaskAttemptId());
+ }
+ }
+
+ private class QueryFinishEventHandler implements EventHandler<QueryMasterQueryCompletedEvent> {
@Override
- public void handle(QueryFinishEvent event) {
+ public void handle(QueryMasterQueryCompletedEvent event) {
QueryId queryId = event.getQueryId();
- LOG.info("Query end notification started for QueryId : " + queryId);
- //QueryMaster must be lived until client fetching all query result data.
+ LOG.info("Query completion notified from " + queryId);
+
+ while (!isTerminatedState(query.getState())) {
+ try {
+ synchronized (this) {
+ wait(10);
+ }
+ } catch (InterruptedException e) {
+ LOG.error(e);
+ }
+ }
+ LOG.info("Query final state: " + query.getState());
+ queryMasterContext.stopQuery(queryId);
+ }
+
+ private boolean isTerminatedState(QueryState state) {
+ return
+ state == QueryState.QUERY_SUCCEEDED ||
+ state == QueryState.QUERY_FAILED ||
+ state == QueryState.QUERY_KILLED;
}
}
@@ -307,7 +337,6 @@ public class QueryMasterTask extends CompositeService {
MasterPlan masterPlan = new MasterPlan(queryId, queryContext, plan);
queryMasterContext.getGlobalPlanner().build(masterPlan);
- //this.masterPlan = queryMasterContext.getGlobalOptimizer().optimize(masterPlan);
query = new Query(queryTaskContext, queryId, querySubmitTime,
"", queryTaskContext.getEventHandler(), masterPlan);
@@ -437,9 +466,9 @@ public class QueryMasterTask extends CompositeService {
return queryId;
}
- public TajoProtos.QueryState getState() {
+ public QueryState getState() {
if(query == null) {
- return TajoProtos.QueryState.QUERY_NOT_ASSIGNED;
+ return QueryState.QUERY_NOT_ASSIGNED;
} else {
return query.getState();
}
@@ -513,5 +542,4 @@ public class QueryMasterTask extends CompositeService {
return queryMetrics;
}
}
-
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
index a0f3dfd..2e4bd70 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
@@ -87,30 +87,73 @@ public class QueryUnit implements EventHandler<TaskEvent> {
private List<DataLocation> dataLocations = Lists.newArrayList();
+ private static final AttemptKilledTransition ATTEMPT_KILLED_TRANSITION = new AttemptKilledTransition();
+
protected static final StateMachineFactory
<QueryUnit, TaskState, TaskEventType, TaskEvent> stateMachineFactory =
- new StateMachineFactory
- <QueryUnit, TaskState, TaskEventType, TaskEvent>(TaskState.NEW)
-
- .addTransition(TaskState.NEW, TaskState.SCHEDULED,
- TaskEventType.T_SCHEDULE, new InitialScheduleTransition())
-
- .addTransition(TaskState.SCHEDULED, TaskState.RUNNING,
- TaskEventType.T_ATTEMPT_LAUNCHED, new AttemptLaunchedTransition())
-
- .addTransition(TaskState.RUNNING, TaskState.RUNNING,
- TaskEventType.T_ATTEMPT_LAUNCHED)
-
- .addTransition(TaskState.RUNNING, TaskState.SUCCEEDED,
- TaskEventType.T_ATTEMPT_SUCCEEDED, new AttemptSucceededTransition())
+ new StateMachineFactory <QueryUnit, TaskState, TaskEventType, TaskEvent>(TaskState.NEW)
+
+ // Transitions from NEW state
+ .addTransition(TaskState.NEW, TaskState.SCHEDULED,
+ TaskEventType.T_SCHEDULE,
+ new InitialScheduleTransition())
+ .addTransition(TaskState.NEW, TaskState.KILLED,
+ TaskEventType.T_KILL,
+ new KillNewTaskTransition())
+
+ // Transitions from SCHEDULED state
+ .addTransition(TaskState.SCHEDULED, TaskState.RUNNING,
+ TaskEventType.T_ATTEMPT_LAUNCHED,
+ new AttemptLaunchedTransition())
+ .addTransition(TaskState.SCHEDULED, TaskState.KILL_WAIT,
+ TaskEventType.T_KILL,
+ new KillTaskTransition())
+
+ // Transitions from RUNNING state
+ .addTransition(TaskState.RUNNING, TaskState.RUNNING,
+ TaskEventType.T_ATTEMPT_LAUNCHED)
+ .addTransition(TaskState.RUNNING, TaskState.SUCCEEDED,
+ TaskEventType.T_ATTEMPT_SUCCEEDED,
+ new AttemptSucceededTransition())
+ .addTransition(TaskState.RUNNING, TaskState.KILL_WAIT,
+ TaskEventType.T_KILL,
+ new KillTaskTransition())
+ .addTransition(TaskState.RUNNING,
+ EnumSet.of(TaskState.RUNNING, TaskState.FAILED),
+ TaskEventType.T_ATTEMPT_FAILED,
+ new AttemptFailedOrRetryTransition())
+
+ // Transitions from KILL_WAIT state
+ .addTransition(TaskState.KILL_WAIT, TaskState.KILLED,
+ TaskEventType.T_ATTEMPT_KILLED,
+ ATTEMPT_KILLED_TRANSITION)
+ .addTransition(TaskState.KILL_WAIT, TaskState.KILL_WAIT,
+ TaskEventType.T_ATTEMPT_LAUNCHED,
+ new KillTaskTransition())
+ .addTransition(TaskState.KILL_WAIT, TaskState.FAILED,
+ TaskEventType.T_ATTEMPT_FAILED,
+ new AttemptFailedTransition())
+ .addTransition(TaskState.KILL_WAIT, TaskState.KILLED,
+ TaskEventType.T_ATTEMPT_SUCCEEDED,
+ ATTEMPT_KILLED_TRANSITION)
+ // Ignore-able transitions.
+ .addTransition(TaskState.KILL_WAIT, TaskState.KILL_WAIT,
+ EnumSet.of(
+ TaskEventType.T_KILL,
+ TaskEventType.T_SCHEDULE))
+
+ // Transitions from SUCCEEDED state
+ // Ignore-able transitions
+ .addTransition(TaskState.SUCCEEDED, TaskState.SUCCEEDED,
+ EnumSet.of(TaskEventType.T_KILL, TaskEventType.T_ATTEMPT_KILLED, TaskEventType.T_ATTEMPT_SUCCEEDED))
+
+ // Transitions from FAILED state
+ // Ignore-able transitions
+ .addTransition(TaskState.FAILED, TaskState.FAILED,
+ EnumSet.of(TaskEventType.T_KILL, TaskEventType.T_ATTEMPT_KILLED, TaskEventType.T_ATTEMPT_SUCCEEDED))
+
+ .installTopology();
- .addTransition(TaskState.RUNNING,
- EnumSet.of(TaskState.RUNNING, TaskState.FAILED),
- TaskEventType.T_ATTEMPT_FAILED, new AttemptFailedTransition())
-
-
-
- .installTopology();
private final StateMachine<TaskState, TaskEventType, TaskEvent> stateMachine;
@@ -406,6 +449,36 @@ public class QueryUnit implements EventHandler<TaskEvent> {
}
}
+ private void finishTask() {
+ this.finishTime = System.currentTimeMillis();
+ }
+
+ private static class KillNewTaskTransition implements SingleArcTransition<QueryUnit, TaskEvent> {
+
+ @Override
+ public void transition(QueryUnit task, TaskEvent taskEvent) {
+ task.eventHandler.handle(new SubQueryTaskEvent(task.getId(), TaskState.KILLED));
+ }
+ }
+
+ private static class KillTaskTransition implements SingleArcTransition<QueryUnit, TaskEvent> {
+
+ @Override
+ public void transition(QueryUnit task, TaskEvent taskEvent) {
+ task.finishTask();
+ task.eventHandler.handle(new TaskAttemptEvent(task.lastAttemptId, TaskAttemptEventType.TA_KILL));
+ }
+ }
+
+ private static class AttemptKilledTransition implements SingleArcTransition<QueryUnit, TaskEvent>{
+
+ @Override
+ public void transition(QueryUnit task, TaskEvent event) {
+ task.finishTask();
+ task.eventHandler.handle(new SubQueryTaskEvent(task.getId(), TaskState.KILLED));
+ }
+ }
+
private static class AttemptSucceededTransition
implements SingleArcTransition<QueryUnit, TaskEvent>{
@@ -413,15 +486,14 @@ public class QueryUnit implements EventHandler<TaskEvent> {
public void transition(QueryUnit task,
TaskEvent event) {
TaskTAttemptEvent attemptEvent = (TaskTAttemptEvent) event;
- QueryUnitAttempt attempt = task.attempts.get(
- attemptEvent.getTaskAttemptId());
+ QueryUnitAttempt attempt = task.attempts.get(attemptEvent.getTaskAttemptId());
task.successfulAttempt = attemptEvent.getTaskAttemptId();
task.succeededHost = attempt.getHost();
- task.finishTime = System.currentTimeMillis();
task.succeededPullServerPort = attempt.getPullServerPort();
- task.eventHandler.handle(new SubQueryTaskEvent(event.getTaskId(),
- SubQueryEventType.SQ_TASK_COMPLETED));
+
+ task.finishTask();
+ task.eventHandler.handle(new SubQueryTaskEvent(event.getTaskId(), TaskState.SUCCEEDED));
}
}
@@ -430,14 +502,28 @@ public class QueryUnit implements EventHandler<TaskEvent> {
public void transition(QueryUnit task,
TaskEvent event) {
TaskTAttemptEvent attemptEvent = (TaskTAttemptEvent) event;
- QueryUnitAttempt attempt = task.attempts.get(
- attemptEvent.getTaskAttemptId());
+ QueryUnitAttempt attempt = task.attempts.get(attemptEvent.getTaskAttemptId());
task.launchTime = System.currentTimeMillis();
task.succeededHost = attempt.getHost();
}
}
- private static class AttemptFailedTransition implements
+ private static class AttemptFailedTransition implements SingleArcTransition<QueryUnit, TaskEvent> {
+ @Override
+ public void transition(QueryUnit task, TaskEvent event) {
+ TaskTAttemptEvent attemptEvent = (TaskTAttemptEvent) event;
+ LOG.info("=============================================================");
+ LOG.info(">>> Task Failed: " + attemptEvent.getTaskAttemptId() + " <<<");
+ LOG.info("=============================================================");
+ task.failedAttempts++;
+ task.finishedAttempts++;
+
+ task.finishTask();
+ task.eventHandler.handle(new SubQueryTaskEvent(task.getId(), TaskState.FAILED));
+ }
+ }
+
+ private static class AttemptFailedOrRetryTransition implements
MultipleArcTransition<QueryUnit, TaskEvent, TaskState> {
@Override
@@ -454,8 +540,8 @@ public class QueryUnit implements EventHandler<TaskEvent> {
task.addAndScheduleAttempt();
}
} else {
- task.eventHandler.handle(
- new SubQueryTaskEvent(task.getId(), SubQueryEventType.SQ_FAILED));
+ task.finishTask();
+ task.eventHandler.handle(new SubQueryTaskEvent(task.getId(), TaskState.FAILED));
return TaskState.FAILED;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
index f5001cc..aac5e37 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
@@ -20,6 +20,7 @@ package org.apache.tajo.master.querymaster;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.state.*;
import org.apache.tajo.QueryUnitAttemptId;
@@ -51,6 +52,7 @@ public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> {
private final QueryUnit queryUnit;
final EventHandler eventHandler;
+ private ContainerId containerId;
private String hostName;
private int port;
private int expire;
@@ -68,44 +70,87 @@ public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> {
<QueryUnitAttempt, TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent>
(TaskAttemptState.TA_NEW)
+ // Transitions from TA_NEW state
.addTransition(TaskAttemptState.TA_NEW, TaskAttemptState.TA_UNASSIGNED,
TaskAttemptEventType.TA_SCHEDULE, new TaskAttemptScheduleTransition())
.addTransition(TaskAttemptState.TA_NEW, TaskAttemptState.TA_UNASSIGNED,
TaskAttemptEventType.TA_RESCHEDULE, new TaskAttemptScheduleTransition())
+ .addTransition(TaskAttemptState.TA_NEW, TaskAttemptState.TA_KILLED,
+ TaskAttemptEventType.TA_KILL,
+ new TaskKilledCompleteTransition())
+ // Transitions from TA_UNASSIGNED state
.addTransition(TaskAttemptState.TA_UNASSIGNED, TaskAttemptState.TA_ASSIGNED,
- TaskAttemptEventType.TA_ASSIGNED, new LaunchTransition())
+ TaskAttemptEventType.TA_ASSIGNED,
+ new LaunchTransition())
+ .addTransition(TaskAttemptState.TA_UNASSIGNED, TaskAttemptState.TA_KILL_WAIT,
+ TaskAttemptEventType.TA_KILL,
+ new KillUnassignedTaskTransition())
- // from assigned
+ // Transitions from TA_ASSIGNED state
.addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_ASSIGNED,
TaskAttemptEventType.TA_ASSIGNED, new AlreadyAssignedTransition())
+ .addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_KILL_WAIT,
+ TaskAttemptEventType.TA_KILL,
+ new KillTaskTransition())
.addTransition(TaskAttemptState.TA_ASSIGNED,
EnumSet.of(TaskAttemptState.TA_RUNNING, TaskAttemptState.TA_KILLED),
TaskAttemptEventType.TA_UPDATE, new StatusUpdateTransition())
-
.addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_SUCCEEDED,
TaskAttemptEventType.TA_DONE, new SucceededTransition())
-
.addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_FAILED,
TaskAttemptEventType.TA_FATAL_ERROR, new FailedTransition())
- // from running
+ // Transitions from TA_RUNNING state
.addTransition(TaskAttemptState.TA_RUNNING,
EnumSet.of(TaskAttemptState.TA_RUNNING),
TaskAttemptEventType.TA_UPDATE, new StatusUpdateTransition())
-
+ .addTransition(TaskAttemptState.TA_RUNNING, TaskAttemptState.TA_KILL_WAIT,
+ TaskAttemptEventType.TA_KILL,
+ new KillTaskTransition())
.addTransition(TaskAttemptState.TA_RUNNING, TaskAttemptState.TA_SUCCEEDED,
TaskAttemptEventType.TA_DONE, new SucceededTransition())
-
.addTransition(TaskAttemptState.TA_RUNNING, TaskAttemptState.TA_FAILED,
TaskAttemptEventType.TA_FATAL_ERROR, new FailedTransition())
+ .addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_KILLED,
+ TaskAttemptEventType.TA_LOCAL_KILLED,
+ new TaskKilledCompleteTransition())
+ .addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_KILL_WAIT,
+ TaskAttemptEventType.TA_ASSIGNED,
+ new KillTaskTransition())
+ .addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_KILLED,
+ TaskAttemptEventType.TA_SCHEDULE_CANCELED,
+ new TaskKilledCompleteTransition())
+ .addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_KILLED,
+ TaskAttemptEventType.TA_DONE,
+ new TaskKilledCompleteTransition())
+ .addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_FAILED,
+ TaskAttemptEventType.TA_FATAL_ERROR)
+ .addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_KILL_WAIT,
+ EnumSet.of(
+ TaskAttemptEventType.TA_KILL,
+ TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
+ TaskAttemptEventType.TA_UPDATE))
+
+ // Transitions from TA_SUCCEEDED state
.addTransition(TaskAttemptState.TA_SUCCEEDED, TaskAttemptState.TA_SUCCEEDED,
TaskAttemptEventType.TA_UPDATE)
.addTransition(TaskAttemptState.TA_SUCCEEDED, TaskAttemptState.TA_SUCCEEDED,
TaskAttemptEventType.TA_DONE, new AlreadyDoneTransition())
.addTransition(TaskAttemptState.TA_SUCCEEDED, TaskAttemptState.TA_FAILED,
TaskAttemptEventType.TA_FATAL_ERROR, new FailedTransition())
+ // Ignore-able transitions
+ .addTransition(TaskAttemptState.TA_SUCCEEDED, TaskAttemptState.TA_SUCCEEDED,
+ TaskAttemptEventType.TA_KILL)
+
+ // Transitions from TA_KILLED state
+ .addTransition(TaskAttemptState.TA_KILLED, TaskAttemptState.TA_KILLED,
+ TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE)
+ // Ignore-able transitions
+ .addTransition(TaskAttemptState.TA_KILLED, TaskAttemptState.TA_KILLED,
+ EnumSet.of(
+ TaskAttemptEventType.TA_UPDATE))
.installTopology();
@@ -158,6 +203,10 @@ public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> {
return this.port;
}
+ public void setContainerId(ContainerId containerId) {
+ this.containerId = containerId;
+ }
+
public void setHost(String host) {
this.hostName = host;
}
@@ -204,17 +253,27 @@ public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> {
}
private static class TaskAttemptScheduleTransition implements
- SingleArcTransition<QueryUnitAttempt, TaskAttemptEvent> {
+ SingleArcTransition<QueryUnitAttempt, TaskAttemptEvent> {
@Override
- public void transition(QueryUnitAttempt taskAttempt,
- TaskAttemptEvent taskAttemptEvent) {
+ public void transition(QueryUnitAttempt taskAttempt, TaskAttemptEvent taskAttemptEvent) {
taskAttempt.eventHandler.handle(new QueryUnitAttemptScheduleEvent(
EventType.T_SCHEDULE, taskAttempt.getQueryUnit().getId().getExecutionBlockId(),
taskAttempt.scheduleContext, taskAttempt));
}
}
+ private static class KillUnassignedTaskTransition implements
+ SingleArcTransition<QueryUnitAttempt, TaskAttemptEvent> {
+
+ @Override
+ public void transition(QueryUnitAttempt taskAttempt, TaskAttemptEvent taskAttemptEvent) {
+ taskAttempt.eventHandler.handle(new QueryUnitAttemptScheduleEvent(
+ EventType.T_SCHEDULE_CANCEL, taskAttempt.getQueryUnit().getId().getExecutionBlockId(),
+ taskAttempt.scheduleContext, taskAttempt));
+ }
+ }
+
private static class LaunchTransition
implements SingleArcTransition<QueryUnitAttempt, TaskAttemptEvent> {
@@ -222,6 +281,7 @@ public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> {
public void transition(QueryUnitAttempt taskAttempt,
TaskAttemptEvent event) {
TaskAttemptAssignedEvent castEvent = (TaskAttemptAssignedEvent) event;
+ taskAttempt.containerId = castEvent.getContainerId();
taskAttempt.setHost(castEvent.getHostName());
taskAttempt.setPullServerPort(castEvent.getPullServerPort());
taskAttempt.eventHandler.handle(
@@ -230,20 +290,29 @@ public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> {
}
}
+ private static class TaskKilledCompleteTransition implements SingleArcTransition<QueryUnitAttempt, TaskAttemptEvent> {
+
+ @Override
+ public void transition(QueryUnitAttempt taskAttempt,
+ TaskAttemptEvent event) {
+ taskAttempt.getQueryUnit().handle(new TaskEvent(taskAttempt.getId().getQueryUnitId(),
+ TaskEventType.T_ATTEMPT_KILLED));
+ LOG.info(taskAttempt.getId() + " Received TA_KILLED Status from LocalTask");
+ }
+ }
+
private static class StatusUpdateTransition
implements MultipleArcTransition<QueryUnitAttempt, TaskAttemptEvent, TaskAttemptState> {
@Override
public TaskAttemptState transition(QueryUnitAttempt taskAttempt,
TaskAttemptEvent event) {
- TaskAttemptStatusUpdateEvent updateEvent =
- (TaskAttemptStatusUpdateEvent) event;
+ TaskAttemptStatusUpdateEvent updateEvent = (TaskAttemptStatusUpdateEvent) event;
switch (updateEvent.getStatus().getState()) {
case TA_PENDING:
case TA_RUNNING:
return TaskAttemptState.TA_RUNNING;
-
default:
return taskAttempt.getState();
}
@@ -289,6 +358,15 @@ public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> {
}
}
+ private static class KillTaskTransition implements SingleArcTransition<QueryUnitAttempt, TaskAttemptEvent> {
+
+ @Override
+ public void transition(QueryUnitAttempt taskAttempt, TaskAttemptEvent event) {
+ taskAttempt.eventHandler.handle(new LocalTaskEvent(taskAttempt.getId(), taskAttempt.containerId,
+ LocalTaskEventType.KILL));
+ }
+ }
+
private static class FailedTransition implements SingleArcTransition<QueryUnitAttempt, TaskAttemptEvent>{
@Override
public void transition(QueryUnitAttempt taskAttempt, TaskAttemptEvent event) {
@@ -302,8 +380,7 @@ public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> {
@Override
public void handle(TaskAttemptEvent event) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Processing " + event.getTaskAttemptId() + " of type "
- + event.getType());
+ LOG.debug("Processing " + event.getTaskAttemptId() + " of type " + event.getType());
}
try {
writeLock.lock();