You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2015/02/06 09:35:47 UTC

[10/11] tajo git commit: TAJO-1336: Fix task failure of stopped task. (jinho)

TAJO-1336: Fix task failure of stopped task. (jinho)

Closes #376


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/161ee9eb
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/161ee9eb
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/161ee9eb

Branch: refs/heads/index_support
Commit: 161ee9ebc641f7e2ab326360b7fc26ab318ef72c
Parents: 42d79cf
Author: jhkim <jh...@apache.org>
Authored: Fri Feb 6 12:29:37 2015 +0900
Committer: jhkim <jh...@apache.org>
Committed: Fri Feb 6 12:29:37 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |  2 +
 .../tajo/worker/ExecutionBlockContext.java      | 28 ++++++------
 .../main/java/org/apache/tajo/worker/Task.java  | 47 +++++++++++++-------
 .../apache/tajo/worker/TaskAttemptContext.java  |  2 +-
 .../apache/tajo/worker/TaskRunnerManager.java   |  4 +-
 .../apache/tajo/querymaster/TestKillQuery.java  | 34 ++++++++++++++
 6 files changed, 84 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/161ee9eb/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 06ced2d..9043a98 100644
--- a/CHANGES
+++ b/CHANGES
@@ -183,6 +183,8 @@ Release 0.10.0 - unreleased
 
   BUG FIXES
 
+    TAJO-1336: Fix task failure of stopped task. (jinho)
+
     TAJO-1316: NPE occurs when performing window functions after join.
     (jihun)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/161ee9eb/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
index b120d5b..8cf94eb 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java
@@ -28,8 +28,8 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.TajoProtos;
+import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.ipc.QueryMasterProtocol;
@@ -42,7 +42,6 @@ import org.apache.tajo.storage.HashShuffleAppenderManager;
 import org.apache.tajo.storage.StorageUtil;
 import org.apache.tajo.util.NetUtils;
 import org.apache.tajo.util.Pair;
-import org.apache.tajo.worker.event.TaskRunnerStartEvent;
 import org.jboss.netty.channel.ConnectTimeoutException;
 import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
 import org.jboss.netty.util.Timer;
@@ -75,6 +74,7 @@ public class ExecutionBlockContext {
   private FileSystem defaultFS;
   private ExecutionBlockId executionBlockId;
   private QueryContext queryContext;
+  private TajoWorker.WorkerContext workerContext;
   private String plan;
 
   private ExecutionBlockSharedResource resource;
@@ -96,13 +96,14 @@ public class ExecutionBlockContext {
 
   private final ConcurrentMap<String, TaskRunnerHistory> histories = Maps.newConcurrentMap();
 
-  public ExecutionBlockContext(TaskRunnerManager manager, TaskRunnerStartEvent event, WorkerConnectionInfo queryMaster)
-      throws Throwable {
+  public ExecutionBlockContext(TajoConf conf, TajoWorker.WorkerContext workerContext,
+                               TaskRunnerManager manager, QueryContext queryContext, String plan,
+                               ExecutionBlockId executionBlockId, WorkerConnectionInfo queryMaster) throws Throwable {
     this.manager = manager;
-    this.executionBlockId = event.getExecutionBlockId();
+    this.executionBlockId = executionBlockId;
     this.connPool = RpcConnectionPool.getPool();
     this.queryMaster = queryMaster;
-    this.systemConf = manager.getTajoConf();
+    this.systemConf = conf;
     this.reporter = new Reporter();
     this.defaultFS = TajoConf.getTajoRootDir(systemConf).getFileSystem(systemConf);
     this.localFS = FileSystem.getLocal(systemConf);
@@ -110,11 +111,10 @@ public class ExecutionBlockContext {
     // Setup QueryEngine according to the query plan
     // Here, we can setup row-based query engine or columnar query engine.
     this.queryEngine = new TajoQueryEngine(systemConf);
-    this.queryContext = event.getQueryContext();
-    this.plan = event.getPlan();
+    this.queryContext = queryContext;
+    this.plan = plan;
     this.resource = new ExecutionBlockSharedResource();
-
-    init();
+    this.workerContext = workerContext;
   }
 
   public void init() throws Throwable {
@@ -193,7 +193,7 @@ public class ExecutionBlockContext {
   }
 
   public TajoConf getConf() {
-    return manager.getTajoConf();
+    return systemConf;
   }
 
   public FileSystem getLocalFS() {
@@ -205,7 +205,7 @@ public class ExecutionBlockContext {
   }
 
   public LocalDirAllocator getLocalDirAllocator() {
-    return manager.getWorkerContext().getLocalDirAllocator();
+    return workerContext.getLocalDirAllocator();
   }
 
   public TajoQueryEngine getTQueryEngine() {
@@ -267,8 +267,8 @@ public class ExecutionBlockContext {
     return histories.get(runner.getId());
   }
 
-  public TajoWorker.WorkerContext getWorkerContext(){
-    return manager.getWorkerContext();
+  public TajoWorker.WorkerContext getWorkerContext() {
+    return workerContext;
   }
 
   protected ClientSocketChannelFactory getShuffleChannelFactory(){

http://git-wip-us.apache.org/repos/asf/tajo/blob/161ee9eb/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
index e9ad838..8f84a9d 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
@@ -106,11 +106,20 @@ public class Task {
               TaskAttemptId taskId,
               final ExecutionBlockContext executionBlockContext,
               final TaskRequest request) throws IOException {
+    this(taskRunnerId, baseDir, taskId, executionBlockContext.getConf(), executionBlockContext, request);
+  }
+
+  public Task(String taskRunnerId,
+              Path baseDir,
+              TaskAttemptId taskId,
+              TajoConf conf,
+              final ExecutionBlockContext executionBlockContext,
+              final TaskRequest request) throws IOException {
     this.taskRunnerId = taskRunnerId;
     this.request = request;
     this.taskId = taskId;
 
-    this.systemConf = executionBlockContext.getConf();
+    this.systemConf = conf;
     this.queryContext = request.getQueryContext(systemConf);
     this.executionBlockContext = executionBlockContext;
     this.taskDir = StorageUtil.concatPath(baseDir,
@@ -120,8 +129,11 @@ public class Task {
         request.getFragments().toArray(new FragmentProto[request.getFragments().size()]), taskDir);
     this.context.setDataChannel(request.getDataChannel());
     this.context.setEnforcer(request.getEnforcer());
+    this.context.setState(TaskAttemptState.TA_PENDING);
     this.inputStats = new TableStats();
+  }
 
+  public void initPlan() throws IOException {
     plan = LogicalNodeDeserializer.deserialize(queryContext, request.getPlan());
     LogicalNode [] scanNode = PlannerUtil.findAllNodes(plan, NodeType.SCAN);
     if (scanNode != null) {
@@ -157,8 +169,6 @@ public class Task {
     }
 
     this.localChunks = Collections.synchronizedList(new ArrayList<FileChunk>());
-    
-    context.setState(TaskAttemptState.TA_PENDING);
     LOG.info("==================================");
     LOG.info("* Stage " + request.getId() + " is initialized");
     LOG.info("* InterQuery: " + interQuery
@@ -180,6 +190,8 @@ public class Task {
   }
 
   public void init() throws IOException {
+    initPlan();
+
     if (context.getState() == TaskAttemptState.TA_PENDING) {
       // initialize a task temporal dir
       FileSystem localFS = executionBlockContext.getLocalFS();
@@ -384,22 +396,23 @@ public class Task {
     startTime = System.currentTimeMillis();
     Throwable error = null;
     try {
-      context.setState(TaskAttemptState.TA_RUNNING);
-
-      if (context.hasFetchPhase()) {
-        // If the fetch is still in progress, the query unit must wait for
-        // complete.
-        waitForFetch();
-        context.setFetcherProgress(FETCHER_PROGRESS);
-        context.setProgressChanged(true);
-        updateProgress();
-      }
+      if(!context.isStopped()) {
+        context.setState(TaskAttemptState.TA_RUNNING);
+        if (context.hasFetchPhase()) {
+          // If the fetch is still in progress, the query unit must wait for
+          // complete.
+          waitForFetch();
+          context.setFetcherProgress(FETCHER_PROGRESS);
+          context.setProgressChanged(true);
+          updateProgress();
+        }
 
-      this.executor = executionBlockContext.getTQueryEngine().
-          createPlan(context, plan);
-      this.executor.init();
+        this.executor = executionBlockContext.getTQueryEngine().
+            createPlan(context, plan);
+        this.executor.init();
 
-      while(!context.isStopped() && executor.next() != null) {
+        while(!context.isStopped() && executor.next() != null) {
+        }
       }
     } catch (Throwable e) {
       error = e ;

http://git-wip-us.apache.org/repos/asf/tajo/blob/161ee9eb/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
index 1f2c325..50cd20a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
@@ -57,7 +57,7 @@ public class TaskAttemptContext {
   private static final Log LOG = LogFactory.getLog(TaskAttemptContext.class);
   private final Map<String, List<FragmentProto>> fragmentMap = Maps.newHashMap();
 
-  private TaskAttemptState state;
+  private volatile TaskAttemptState state;
   private TableStats resultStats;
   private TaskAttemptId queryId;
   private final Path workDir;

http://git-wip-us.apache.org/repos/asf/tajo/blob/161ee9eb/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
index 4b10203..57ae566 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
@@ -169,7 +169,9 @@ public class TaskRunnerManager extends CompositeService implements EventHandler<
 
       if(context == null){
         try {
-          context = new ExecutionBlockContext(this, startEvent, startEvent.getQueryMaster());
+          context = new ExecutionBlockContext(getTajoConf(), getWorkerContext(), this, startEvent.getQueryContext(),
+              startEvent.getPlan(), startEvent.getExecutionBlockId(), startEvent.getQueryMaster());
+          context.init();
         } catch (Throwable e) {
           LOG.fatal(e.getMessage(), e);
           throw new RuntimeException(e);

http://git-wip-us.apache.org/repos/asf/tajo/blob/161ee9eb/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
index 0574bea..1edaa15 100644
--- a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
+++ b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
@@ -23,6 +23,7 @@ import org.apache.tajo.*;
 import org.apache.tajo.algebra.Expr;
 import org.apache.tajo.benchmark.TPCH;
 import org.apache.tajo.catalog.CatalogService;
+import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.client.TajoClient;
 import org.apache.tajo.client.TajoClientImpl;
 import org.apache.tajo.conf.TajoConf;
@@ -30,6 +31,7 @@ import org.apache.tajo.engine.parser.SQLAnalyzer;
 import org.apache.tajo.engine.planner.global.GlobalPlanner;
 import org.apache.tajo.engine.planner.global.MasterPlan;
 import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.engine.query.TaskRequestImpl;
 import org.apache.tajo.ipc.ClientProtos;
 import org.apache.tajo.master.event.QueryEvent;
 import org.apache.tajo.master.event.QueryEventType;
@@ -38,13 +40,18 @@ import org.apache.tajo.master.event.StageEventType;
 import org.apache.tajo.plan.LogicalOptimizer;
 import org.apache.tajo.plan.LogicalPlan;
 import org.apache.tajo.plan.LogicalPlanner;
+import org.apache.tajo.plan.serder.PlanProto;
 import org.apache.tajo.session.Session;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.worker.ExecutionBlockContext;
+import org.apache.tajo.worker.Task;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 
 import static org.junit.Assert.*;
@@ -190,4 +197,31 @@ public class TestKillQuery {
     lastStage.getStateMachine().doTransition(StageEventType.SQ_FAILED,
         new StageEvent(lastStage.getId(), StageEventType.SQ_FAILED));
   }
+
+  @Test
+  public void testKillTask() throws Throwable {
+    QueryId qid = LocalTajoTestingUtility.newQueryId();
+    ExecutionBlockId eid = QueryIdFactory.newExecutionBlockId(qid, 1);
+    TaskId tid = QueryIdFactory.newTaskId(eid);
+    TajoConf conf = new TajoConf();
+    TaskRequestImpl taskRequest = new TaskRequestImpl();
+
+    taskRequest.set(null, new ArrayList<CatalogProtos.FragmentProto>(),
+        null, false, PlanProto.LogicalNodeTree.newBuilder().build(), new QueryContext(conf), null, null);
+    taskRequest.setInterQuery();
+    TaskAttemptId attemptId = new TaskAttemptId(tid, 1);
+
+    ExecutionBlockContext context = new ExecutionBlockContext(conf, null, null, new QueryContext(conf), null, eid, null);
+
+    org.apache.tajo.worker.Task task = new Task("test", CommonTestingUtil.getTestDir(), attemptId,
+        conf, context, taskRequest);
+    task.kill();
+    assertEquals(TajoProtos.TaskAttemptState.TA_KILLED, task.getStatus());
+    try {
+      task.run();
+      assertEquals(TajoProtos.TaskAttemptState.TA_KILLED, task.getStatus());
+    } catch (Exception e) {
+      assertEquals(TajoProtos.TaskAttemptState.TA_KILLED, task.getStatus());
+    }
+  }
 }