You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/04/19 12:46:15 UTC

git commit: TAJO-32: Cleanup TaskRunner. (hyunsik)

Updated Branches:
  refs/heads/master 58dc28233 -> 8c60ea8fb


TAJO-32: Cleanup TaskRunner. (hyunsik)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/8c60ea8f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/8c60ea8f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/8c60ea8f

Branch: refs/heads/master
Commit: 8c60ea8fbf67c72b261cac1543eda17c0d779248
Parents: 58dc282
Author: Hyunsik Choi <hy...@apache.org>
Authored: Fri Apr 19 19:40:16 2013 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Fri Apr 19 19:45:16 2013 +0900

----------------------------------------------------------------------
 CHANGES.txt                                        |    4 +-
 .../src/main/java/tajo/worker/Task.java            |   22 +-
 .../src/main/java/tajo/worker/TaskRunner.java      |  171 ++++++++-------
 .../java/tajo/pullserver/PullServerAuxService.java |    9 +-
 4 files changed, 113 insertions(+), 93 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8c60ea8f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 1b50ad6..256f224 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,8 @@ Release 0.2.0 - unreleased
 
   IMPROVEMENTS
 
+    TAJO-32: Cleanup TaskRunner. (hyunsik)
+
     TAJO-27: Modify the document links to point the wiki's ones. (hyunsik)
     
     TAJO-17: Improve the examples for jvm version and auxiliary service in 
@@ -37,4 +39,4 @@ Release 0.2.0 - unreleased
 
     TAJO-6: Rename tajo.engine.function.builtin.NewSumInt to SumInt. (rsumbaly)
 
-    TAJO-21: CREATE EXTERNAL TABLE should support a file path (JaeHwa Jung)
+    TAJO-21: CREATE EXTERNAL TABLE should support a file path. (JaeHwa Jung)

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8c60ea8f/tajo-core/tajo-core-backend/src/main/java/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/worker/Task.java b/tajo-core/tajo-core-backend/src/main/java/tajo/worker/Task.java
index 8b0df26..28ccbfe 100644
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/worker/Task.java
+++ b/tajo-core/tajo-core-backend/src/main/java/tajo/worker/Task.java
@@ -75,6 +75,7 @@ public class Task {
   private final LocalDirAllocator lDirAllocator;
   private final QueryUnitAttemptId taskId;
 
+  private final Path taskDir;
   private final QueryUnitRequest request;
   private final TaskAttemptContext context;
   private List<Fetcher> fetcherRunners;
@@ -128,7 +129,7 @@ public class Task {
 
   public Task(QueryUnitAttemptId taskId,
               final WorkerContext worker, final Interface masterProxy,
-              final QueryUnitRequest request, Path taskDir) throws IOException {
+              final QueryUnitRequest request) throws IOException {
     this.request = request;
     this.reporter = new Reporter(masterProxy);
     this.reporter.startCommunicationThread();
@@ -139,6 +140,8 @@ public class Task {
     this.masterProxy = masterProxy;
     this.localFS = worker.getLocalFS();
     this.lDirAllocator = worker.getLocalDirAllocator();
+    this.taskDir = StorageUtil.concatPath(workerContext.getBaseDir(),
+        taskId.getQueryUnitId().getId() + "_" + taskId.getId());
 
     this.context = new TaskAttemptContext(conf, taskId,
         request.getFragments().toArray(new Fragment[request.getFragments().size()]),
@@ -156,9 +159,11 @@ public class Task {
         this.sortComp = new TupleComparator(finalSchema, sortNode.getSortKeys());
       }
     } else {
+      // The final result of a task will be written in a file named part-ss-nnnnnnn,
+      // where ss is the subquery id associated with this task, and nnnnnn is the task id.
       Path outFilePath = new Path(conf.getOutputPath(),
-          OUTPUT_FILE_FORMAT_SUBQUERY.get().format(taskId.getSubQueryId().getId()) +
           OUTPUT_FILE_PREFIX +
+          OUTPUT_FILE_FORMAT_SUBQUERY.get().format(taskId.getSubQueryId().getId()) + "-" +
           OUTPUT_FILE_FORMAT_TASK.get().format(taskId.getQueryUnitId().getId()));
       LOG.info("Output File Path: " + outFilePath);
       context.setOutputPath(outFilePath);
@@ -186,6 +191,9 @@ public class Task {
   }
 
   public void init() throws IOException {
+    // initialize a task temporal dir
+    localFS.mkdirs(taskDir);
+
     if (request.getFetches().size() > 0) {
       inputTableBaseDir = localFS.makeQualified(
           lDirAllocator.getLocalPathForWrite(
@@ -328,16 +336,6 @@ public class Task {
         Entry<Integer,String> entry = it.next();
         Partition.Builder part = Partition.newBuilder();
         part.setPartitionKey(entry.getKey());
-        if (partitionType == PARTITION_TYPE.HASH) {
-//          part.setFileName(
-//              dataServerURL + "/?qid=" + getId().toString() + "&fn=" +
-//                  entry.getValue());
-        } else if (partitionType == PARTITION_TYPE.LIST) {
-//          part.setFileName(dataServerURL + "/?qid=" + getId().toString() +
-//              "&fn=0");
-        } else {
-//          part.setFileName(dataServerURL + "/?qid=" + getId().toString());
-        }
         builder.addPartitions(part.build());
       } while (it.hasNext());
     }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8c60ea8f/tajo-core/tajo-core-backend/src/main/java/tajo/worker/TaskRunner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/worker/TaskRunner.java b/tajo-core/tajo-core-backend/src/main/java/tajo/worker/TaskRunner.java
index 9e77164..ff5ca86 100644
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/worker/TaskRunner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/tajo/worker/TaskRunner.java
@@ -30,7 +30,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
 import org.apache.hadoop.yarn.service.AbstractService;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import tajo.QueryConf;
@@ -56,8 +55,13 @@ import java.util.concurrent.*;
 
 import static tajo.engine.MasterWorkerProtos.TaskFatalErrorReport;
 
+/**
+ * The driver class for Tajo QueryUnit processing.
+ */
 public class TaskRunner extends AbstractService {
+  /** class logger */
   private static final Log LOG = LogFactory.getLog(TaskRunner.class);
+
   private QueryConf conf;
 
   private volatile boolean stopped = false;
@@ -70,24 +74,35 @@ public class TaskRunner extends AbstractService {
   // Cluster Management
   private MasterWorkerProtocolService.Interface master;
 
-  // Query Processing
+  // for temporal or intermediate files
   private FileSystem localFS;
+  // for input files
   private FileSystem defaultFS;
 
   private TajoQueryEngine queryEngine;
+
+  // TODO - this should be configurable
   private final int coreNum = 4;
+
+  // for Fetcher
   private final ExecutorService fetchLauncher =
       Executors.newFixedThreadPool(coreNum * 4);
+  // It keeps all of the query unit attempts while a TaskRunner is running.
   private final Map<QueryUnitAttemptId, Task> tasks =
       new ConcurrentHashMap<QueryUnitAttemptId, Task>();
   private LocalDirAllocator lDirAllocator;
 
+  // A thread to receive each assigned query unit and execute the query unit
   private Thread taskLauncher;
 
+  // Contains the object references related for TaskRunner
   private WorkerContext workerContext;
+  // for the doAs block
   private UserGroupInformation taskOwner;
 
+  // for the local temporal dir
   private String baseDir;
+  private Path baseDirPath;
 
   public TaskRunner(
       final SubQueryId subQueryId,
@@ -110,15 +125,23 @@ public class TaskRunner extends AbstractService {
     try {
       this.workerContext = new WorkerContext();
 
-      baseDir =
-          ContainerLocalizer.USERCACHE + "/" + taskOwner.getShortUserName() + "/"
-              + ContainerLocalizer.APPCACHE + "/"
-              + ConverterUtils.toString(appId)
-              + "/output" + "/" + subQueryId.getId();
+      // initialize DFS and LocalFileSystems
+      defaultFS = FileSystem.get(URI.create(conf.get("tajo.rootdir")),conf);
+      localFS = FileSystem.getLocal(conf);
 
-      // Setup LocalDirAllocator
+      // the base dir for an output dir
+      baseDir = ConverterUtils.toString(appId)
+          + "/output" + "/" + subQueryId.getId();
+
+      // initialize LocalDirAllocator
       lDirAllocator = new LocalDirAllocator(ConfVars.TASK_LOCAL_DIR.varname);
-      LOG.info("Task LocalCache: " + baseDir);
+
+      baseDirPath = localFS.makeQualified(lDirAllocator.getLocalPathForWrite(baseDir, conf));
+      LOG.info("TaskRunner basedir is created (" + baseDir +")");
+
+      // Setup QueryEngine according to the query plan
+      // Here, we can setup row-based query engine or columnar query engine.
+      this.queryEngine = new TajoQueryEngine(conf);
 
       Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownHook()));
     } catch (Throwable t) {
@@ -130,24 +153,13 @@ public class TaskRunner extends AbstractService {
 
   @Override
   public void start() {
-    try {
-      // Setup DFS and LocalFileSystems
-      defaultFS = FileSystem.get(URI.create(conf.get("tajo.rootdir")),conf);
-      localFS = FileSystem.getLocal(conf);
-
-      // Setup QueryEngine according to the query plan
-      // Here, we can setup row-based query engine or columnar query engine.
-      this.queryEngine = new TajoQueryEngine(conf);
-    } catch (Throwable t) {
-      LOG.error(t);
-    }
-
     run();
   }
 
   @Override
   public void stop() {
     if (!isStopped()) {
+      // If TaskRunner is stopped, all running or pending tasks will be marked as failed.
       for (Task task : tasks.values()) {
         if (task.getStatus() == TaskAttemptState.TA_PENDING ||
             task.getStatus() == TaskAttemptState.TA_RUNNING) {
@@ -155,6 +167,7 @@ public class TaskRunner extends AbstractService {
         }
       }
 
+      // If this flag become true, taskLauncher will be terminated.
       this.stopped = true;
 
       LOG.info("STOPPED: " + nodeId);
@@ -204,10 +217,14 @@ public class TaskRunner extends AbstractService {
     public ExecutorService getFetchLauncher() {
       return fetchLauncher;
     }
+
+    public Path getBaseDir() {
+      return baseDirPath;
+    }
   }
 
   static void fatalError(MasterWorkerProtocolService.Interface proxy,
-                                 QueryUnitAttemptId taskAttemptId, String message) {
+                         QueryUnitAttemptId taskAttemptId, String message) {
     TaskFatalErrorReport.Builder builder = TaskFatalErrorReport.newBuilder()
         .setId(taskAttemptId.getProto())
         .setErrorMessage(message);
@@ -215,7 +232,7 @@ public class TaskRunner extends AbstractService {
   }
 
   public void run() {
-    LOG.info("Tajo Worker startup");
+    LOG.info("TaskRunner startup");
 
     try {
 
@@ -228,53 +245,55 @@ public class TaskRunner extends AbstractService {
 
           while(!stopped) {
             try {
-                if (callFuture == null) {
-                  callFuture = new CallFuture2<QueryUnitRequestProto>();
-                  master.getTask(null, ((ContainerIdPBImpl) containerId).getProto(),
-                      callFuture);
-                }
-                try {
-                  taskRequest = callFuture.get(3, TimeUnit.SECONDS);
-                } catch (TimeoutException te) {
-                  LOG.error(te);
-                }
-
-                if (taskRequest != null) {
-                  if (taskRequest.getShouldDie()) {
-                    LOG.info("received ShouldDie flag");
-                    stop();
-
-                  } else {
-
-                    LOG.info("Accumulated Received Task: " + (++receivedNum));
-
-                    QueryUnitAttemptId taskAttemptId = new QueryUnitAttemptId(taskRequest.getId());
-                    if (tasks.containsKey(taskAttemptId)) {
-                      fatalError(master, taskAttemptId, "Duplicate Task Attempt: " + taskAttemptId);
-                      continue;
-                    }
-
-                    Path taskTempDir = localFS.makeQualified(
-                        lDirAllocator.getLocalPathForWrite(baseDir +
-                            "/" + taskAttemptId.getQueryUnitId().getId()
-                            + "_" + taskAttemptId.getId(), conf));
-
-                    LOG.info("Initializing: " + taskAttemptId);
-                    Task task = new Task(taskAttemptId, workerContext, master,
-                        new QueryUnitRequestImpl(taskRequest), taskTempDir);
-                    tasks.put(taskAttemptId, task);
+              if (callFuture == null) {
+                callFuture = new CallFuture2<QueryUnitRequestProto>();
+                master.getTask(null, ((ContainerIdPBImpl) containerId).getProto(),
+                    callFuture);
+              }
+              try {
+                // wait for an assigning task for 3 seconds
+                taskRequest = callFuture.get(3, TimeUnit.SECONDS);
+              } catch (TimeoutException te) {
+                // if there has been no assigning task for a given period,
+                // TaskRunner will retry to request an assigning task.
+                LOG.error(te);
+                continue;
+              }
+
+              if (taskRequest != null) {
+                // QueryMaster can send the terminal signal to TaskRunner.
+                // If TaskRunner receives the terminal signal, TaskRunner will be terminated
+                // immediately.
+                if (taskRequest.getShouldDie()) {
+                  LOG.info("received ShouldDie flag");
+                  stop();
+
+                } else {
+
+                  LOG.info("Accumulated Received Task: " + (++receivedNum));
+
+                  QueryUnitAttemptId taskAttemptId = new QueryUnitAttemptId(taskRequest.getId());
+                  if (tasks.containsKey(taskAttemptId)) {
+                    fatalError(master, taskAttemptId, "Duplicate Task Attempt: " + taskAttemptId);
+                    continue;
+                  }
 
-                    task.init();
-                    if (task.hasFetchPhase()) {
-                      task.fetch(); // The fetch is performed in an asynchronous way.
-                    }
-                    // task.run() is a blocking call.
-                    task.run();
+                  LOG.info("Initializing: " + taskAttemptId);
+                  Task task = new Task(taskAttemptId, workerContext, master,
+                      new QueryUnitRequestImpl(taskRequest));
+                  tasks.put(taskAttemptId, task);
 
-                    callFuture = null;
-                    taskRequest = null;
+                  task.init();
+                  if (task.hasFetchPhase()) {
+                    task.fetch(); // The fetch is performed in an asynchronous way.
                   }
+                  // task.run() is a blocking call.
+                  task.run();
+
+                  callFuture = null;
+                  taskRequest = null;
                 }
+              }
             } catch (Throwable t) {
               LOG.error(t);
             }
@@ -311,10 +330,14 @@ public class TaskRunner extends AbstractService {
   }
 
   /**
-   * 1st Arg: TaskRunnerListener hostname
-   * 2nd Arg: TaskRunnerListener port
-   * 3nd Arg: SubQueryId
-   * 4th Arg: NodeId
+   * TaskRunner takes 5 arguments as follows:
+   * <ol>
+   * <li>1st: TaskRunnerListener hostname</li>
+   * <li>2nd: TaskRunnerListener port</li>
+   * <li>3nd: SubQueryId</li>
+   * <li>4th: NodeId</li>
+   * <li>5th: ContainerId</li>
+   * </ol>
    */
   public static void main(String[] args) throws Exception {
     // Restore QueryConf
@@ -333,13 +356,13 @@ public class TaskRunner extends AbstractService {
     final InetSocketAddress masterAddr =
         NetUtils.createSocketAddrForHost(host, port);
 
-    // SubQueryId
+    // SubQueryId from String
     final SubQueryId subQueryId = TajoIdUtils.newSubQueryId(args[2]);
-    // NodeId for itself
+    // NodeId has a form of hostname:port.
     NodeId nodeId = ConverterUtils.toNodeId(args[3]);
     ContainerId containerId = ConverterUtils.toContainerId(args[4]);
 
-    // TODO - load credential
+    // TODO - 'load credential' should be implemented
     // Getting taskOwner
     UserGroupInformation taskOwner =
         UserGroupInformation.createRemoteUser(conf.getVar(ConfVars.QUERY_USERNAME));
@@ -349,7 +372,7 @@ public class TaskRunner extends AbstractService {
     ProtoAsyncRpcClient client;
     MasterWorkerProtocolService.Interface master;
 
-    // Create MasterWorkerProtocol as actual task owner.
+    // initialize MasterWorkerProtocol as an actual task owner.
     client =
         taskOwner.doAs(new PrivilegedExceptionAction<ProtoAsyncRpcClient>() {
           @Override

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8c60ea8f/tajo-core/tajo-core-pullserver/src/main/java/tajo/pullserver/PullServerAuxService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-pullserver/src/main/java/tajo/pullserver/PullServerAuxService.java b/tajo-core/tajo-core-pullserver/src/main/java/tajo/pullserver/PullServerAuxService.java
index 607662d..c98e70c 100644
--- a/tajo-core/tajo-core-pullserver/src/main/java/tajo/pullserver/PullServerAuxService.java
+++ b/tajo-core/tajo-core-pullserver/src/main/java/tajo/pullserver/PullServerAuxService.java
@@ -385,16 +385,13 @@ public class PullServerAuxService extends AbstractService
       List<String> taskIds = splitMaps(taskIdList);
 
       // the working dir of tajo worker for each query
-      String base =
-          ContainerLocalizer.USERCACHE + "/" + userName + "/"
-              + ContainerLocalizer.APPCACHE + "/"
-              + appId + "/output" + "/";
+      String queryBaseDir = appId + "/output" + "/";
 
       // if a subquery requires a range partitioning
       if (repartitionType.equals("r")) {
         String ta = taskIds.get(0);
         Path path = localFS.makeQualified(
-            lDirAlloc.getLocalPathToRead(base + "/" + sid + "/"
+            lDirAlloc.getLocalPathToRead(queryBaseDir + "/" + sid + "/"
                 + ta + "/output/", conf));
 
         String startKey = params.get("start").get(0);
@@ -417,7 +414,7 @@ public class PullServerAuxService extends AbstractService
       } else if (repartitionType.equals("h")) {
         for (String ta : taskIds) {
           Path path = localFS.makeQualified(
-              lDirAlloc.getLocalPathToRead(base + "/" + sid + "/" +
+              lDirAlloc.getLocalPathToRead(queryBaseDir + "/" + sid + "/" +
                   ta + "/output/" + partitionId, conf));
           File file = new File(path.toUri());
           FileChunk chunk = new FileChunk(file, 0, file.length());