You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2015/02/06 09:35:47 UTC
[10/11] tajo git commit: TAJO-1336: Fix task failure of stopped task.
(jinho)
TAJO-1336: Fix task failure of stopped task. (jinho)
Closes #376
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/161ee9eb
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/161ee9eb
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/161ee9eb
Branch: refs/heads/index_support
Commit: 161ee9ebc641f7e2ab326360b7fc26ab318ef72c
Parents: 42d79cf
Author: jhkim <jh...@apache.org>
Authored: Fri Feb 6 12:29:37 2015 +0900
Committer: jhkim <jh...@apache.org>
Committed: Fri Feb 6 12:29:37 2015 +0900
----------------------------------------------------------------------
CHANGES | 2 +
.../tajo/worker/ExecutionBlockContext.java | 28 ++++++------
.../main/java/org/apache/tajo/worker/Task.java | 47 +++++++++++++-------
.../apache/tajo/worker/TaskAttemptContext.java | 2 +-
.../apache/tajo/worker/TaskRunnerManager.java | 4 +-
.../apache/tajo/querymaster/TestKillQuery.java | 34 ++++++++++++++
6 files changed, 84 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/161ee9eb/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 06ced2d..9043a98 100644
--- a/CHANGES
+++ b/CHANGES
@@ -183,6 +183,8 @@ Release 0.10.0 - unreleased
BUG FIXES
+ TAJO-1336: Fix task failure of stopped task. (jinho)
+
TAJO-1316: NPE occurs when performing window functions after join.
(jihun)
http://git-wip-us.apache.org/repos/asf/tajo/blob/161ee9eb/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
index b120d5b..8cf94eb 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
@@ -28,8 +28,8 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.TaskAttemptId;
import org.apache.tajo.TajoProtos;
+import org.apache.tajo.TaskAttemptId;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.ipc.QueryMasterProtocol;
@@ -42,7 +42,6 @@ import org.apache.tajo.storage.HashShuffleAppenderManager;
import org.apache.tajo.storage.StorageUtil;
import org.apache.tajo.util.NetUtils;
import org.apache.tajo.util.Pair;
-import org.apache.tajo.worker.event.TaskRunnerStartEvent;
import org.jboss.netty.channel.ConnectTimeoutException;
import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
import org.jboss.netty.util.Timer;
@@ -75,6 +74,7 @@ public class ExecutionBlockContext {
private FileSystem defaultFS;
private ExecutionBlockId executionBlockId;
private QueryContext queryContext;
+ private TajoWorker.WorkerContext workerContext;
private String plan;
private ExecutionBlockSharedResource resource;
@@ -96,13 +96,14 @@ public class ExecutionBlockContext {
private final ConcurrentMap<String, TaskRunnerHistory> histories = Maps.newConcurrentMap();
- public ExecutionBlockContext(TaskRunnerManager manager, TaskRunnerStartEvent event, WorkerConnectionInfo queryMaster)
- throws Throwable {
+ public ExecutionBlockContext(TajoConf conf, TajoWorker.WorkerContext workerContext,
+ TaskRunnerManager manager, QueryContext queryContext, String plan,
+ ExecutionBlockId executionBlockId, WorkerConnectionInfo queryMaster) throws Throwable {
this.manager = manager;
- this.executionBlockId = event.getExecutionBlockId();
+ this.executionBlockId = executionBlockId;
this.connPool = RpcConnectionPool.getPool();
this.queryMaster = queryMaster;
- this.systemConf = manager.getTajoConf();
+ this.systemConf = conf;
this.reporter = new Reporter();
this.defaultFS = TajoConf.getTajoRootDir(systemConf).getFileSystem(systemConf);
this.localFS = FileSystem.getLocal(systemConf);
@@ -110,11 +111,10 @@ public class ExecutionBlockContext {
// Setup QueryEngine according to the query plan
// Here, we can setup row-based query engine or columnar query engine.
this.queryEngine = new TajoQueryEngine(systemConf);
- this.queryContext = event.getQueryContext();
- this.plan = event.getPlan();
+ this.queryContext = queryContext;
+ this.plan = plan;
this.resource = new ExecutionBlockSharedResource();
-
- init();
+ this.workerContext = workerContext;
}
public void init() throws Throwable {
@@ -193,7 +193,7 @@ public class ExecutionBlockContext {
}
public TajoConf getConf() {
- return manager.getTajoConf();
+ return systemConf;
}
public FileSystem getLocalFS() {
@@ -205,7 +205,7 @@ public class ExecutionBlockContext {
}
public LocalDirAllocator getLocalDirAllocator() {
- return manager.getWorkerContext().getLocalDirAllocator();
+ return workerContext.getLocalDirAllocator();
}
public TajoQueryEngine getTQueryEngine() {
@@ -267,8 +267,8 @@ public class ExecutionBlockContext {
return histories.get(runner.getId());
}
- public TajoWorker.WorkerContext getWorkerContext(){
- return manager.getWorkerContext();
+ public TajoWorker.WorkerContext getWorkerContext() {
+ return workerContext;
}
protected ClientSocketChannelFactory getShuffleChannelFactory(){
http://git-wip-us.apache.org/repos/asf/tajo/blob/161ee9eb/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
index e9ad838..8f84a9d 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
@@ -106,11 +106,20 @@ public class Task {
TaskAttemptId taskId,
final ExecutionBlockContext executionBlockContext,
final TaskRequest request) throws IOException {
+ this(taskRunnerId, baseDir, taskId, executionBlockContext.getConf(), executionBlockContext, request);
+ }
+
+ public Task(String taskRunnerId,
+ Path baseDir,
+ TaskAttemptId taskId,
+ TajoConf conf,
+ final ExecutionBlockContext executionBlockContext,
+ final TaskRequest request) throws IOException {
this.taskRunnerId = taskRunnerId;
this.request = request;
this.taskId = taskId;
- this.systemConf = executionBlockContext.getConf();
+ this.systemConf = conf;
this.queryContext = request.getQueryContext(systemConf);
this.executionBlockContext = executionBlockContext;
this.taskDir = StorageUtil.concatPath(baseDir,
@@ -120,8 +129,11 @@ public class Task {
request.getFragments().toArray(new FragmentProto[request.getFragments().size()]), taskDir);
this.context.setDataChannel(request.getDataChannel());
this.context.setEnforcer(request.getEnforcer());
+ this.context.setState(TaskAttemptState.TA_PENDING);
this.inputStats = new TableStats();
+ }
+ public void initPlan() throws IOException {
plan = LogicalNodeDeserializer.deserialize(queryContext, request.getPlan());
LogicalNode [] scanNode = PlannerUtil.findAllNodes(plan, NodeType.SCAN);
if (scanNode != null) {
@@ -157,8 +169,6 @@ public class Task {
}
this.localChunks = Collections.synchronizedList(new ArrayList<FileChunk>());
-
- context.setState(TaskAttemptState.TA_PENDING);
LOG.info("==================================");
LOG.info("* Stage " + request.getId() + " is initialized");
LOG.info("* InterQuery: " + interQuery
@@ -180,6 +190,8 @@ public class Task {
}
public void init() throws IOException {
+ initPlan();
+
if (context.getState() == TaskAttemptState.TA_PENDING) {
// initialize a task temporal dir
FileSystem localFS = executionBlockContext.getLocalFS();
@@ -384,22 +396,23 @@ public class Task {
startTime = System.currentTimeMillis();
Throwable error = null;
try {
- context.setState(TaskAttemptState.TA_RUNNING);
-
- if (context.hasFetchPhase()) {
- // If the fetch is still in progress, the query unit must wait for
- // complete.
- waitForFetch();
- context.setFetcherProgress(FETCHER_PROGRESS);
- context.setProgressChanged(true);
- updateProgress();
- }
+ if(!context.isStopped()) {
+ context.setState(TaskAttemptState.TA_RUNNING);
+ if (context.hasFetchPhase()) {
+ // If the fetch is still in progress, the query unit must wait for
+ // complete.
+ waitForFetch();
+ context.setFetcherProgress(FETCHER_PROGRESS);
+ context.setProgressChanged(true);
+ updateProgress();
+ }
- this.executor = executionBlockContext.getTQueryEngine().
- createPlan(context, plan);
- this.executor.init();
+ this.executor = executionBlockContext.getTQueryEngine().
+ createPlan(context, plan);
+ this.executor.init();
- while(!context.isStopped() && executor.next() != null) {
+ while(!context.isStopped() && executor.next() != null) {
+ }
}
} catch (Throwable e) {
error = e ;
http://git-wip-us.apache.org/repos/asf/tajo/blob/161ee9eb/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
index 1f2c325..50cd20a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
@@ -57,7 +57,7 @@ public class TaskAttemptContext {
private static final Log LOG = LogFactory.getLog(TaskAttemptContext.class);
private final Map<String, List<FragmentProto>> fragmentMap = Maps.newHashMap();
- private TaskAttemptState state;
+ private volatile TaskAttemptState state;
private TableStats resultStats;
private TaskAttemptId queryId;
private final Path workDir;
http://git-wip-us.apache.org/repos/asf/tajo/blob/161ee9eb/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
index 4b10203..57ae566 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
@@ -169,7 +169,9 @@ public class TaskRunnerManager extends CompositeService implements EventHandler<
if(context == null){
try {
- context = new ExecutionBlockContext(this, startEvent, startEvent.getQueryMaster());
+ context = new ExecutionBlockContext(getTajoConf(), getWorkerContext(), this, startEvent.getQueryContext(),
+ startEvent.getPlan(), startEvent.getExecutionBlockId(), startEvent.getQueryMaster());
+ context.init();
} catch (Throwable e) {
LOG.fatal(e.getMessage(), e);
throw new RuntimeException(e);
http://git-wip-us.apache.org/repos/asf/tajo/blob/161ee9eb/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
index 0574bea..1edaa15 100644
--- a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
+++ b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
@@ -23,6 +23,7 @@ import org.apache.tajo.*;
import org.apache.tajo.algebra.Expr;
import org.apache.tajo.benchmark.TPCH;
import org.apache.tajo.catalog.CatalogService;
+import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.client.TajoClient;
import org.apache.tajo.client.TajoClientImpl;
import org.apache.tajo.conf.TajoConf;
@@ -30,6 +31,7 @@ import org.apache.tajo.engine.parser.SQLAnalyzer;
import org.apache.tajo.engine.planner.global.GlobalPlanner;
import org.apache.tajo.engine.planner.global.MasterPlan;
import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.engine.query.TaskRequestImpl;
import org.apache.tajo.ipc.ClientProtos;
import org.apache.tajo.master.event.QueryEvent;
import org.apache.tajo.master.event.QueryEventType;
@@ -38,13 +40,18 @@ import org.apache.tajo.master.event.StageEventType;
import org.apache.tajo.plan.LogicalOptimizer;
import org.apache.tajo.plan.LogicalPlan;
import org.apache.tajo.plan.LogicalPlanner;
+import org.apache.tajo.plan.serder.PlanProto;
import org.apache.tajo.session.Session;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.worker.ExecutionBlockContext;
+import org.apache.tajo.worker.Task;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
import static org.junit.Assert.*;
@@ -190,4 +197,31 @@ public class TestKillQuery {
lastStage.getStateMachine().doTransition(StageEventType.SQ_FAILED,
new StageEvent(lastStage.getId(), StageEventType.SQ_FAILED));
}
+
+ @Test
+ public void testKillTask() throws Throwable {
+ QueryId qid = LocalTajoTestingUtility.newQueryId();
+ ExecutionBlockId eid = QueryIdFactory.newExecutionBlockId(qid, 1);
+ TaskId tid = QueryIdFactory.newTaskId(eid);
+ TajoConf conf = new TajoConf();
+ TaskRequestImpl taskRequest = new TaskRequestImpl();
+
+ taskRequest.set(null, new ArrayList<CatalogProtos.FragmentProto>(),
+ null, false, PlanProto.LogicalNodeTree.newBuilder().build(), new QueryContext(conf), null, null);
+ taskRequest.setInterQuery();
+ TaskAttemptId attemptId = new TaskAttemptId(tid, 1);
+
+ ExecutionBlockContext context = new ExecutionBlockContext(conf, null, null, new QueryContext(conf), null, eid, null);
+
+ org.apache.tajo.worker.Task task = new Task("test", CommonTestingUtil.getTestDir(), attemptId,
+ conf, context, taskRequest);
+ task.kill();
+ assertEquals(TajoProtos.TaskAttemptState.TA_KILLED, task.getStatus());
+ try {
+ task.run();
+ assertEquals(TajoProtos.TaskAttemptState.TA_KILLED, task.getStatus());
+ } catch (Exception e) {
+ assertEquals(TajoProtos.TaskAttemptState.TA_KILLED, task.getStatus());
+ }
+ }
}