You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/04/19 12:46:15 UTC
git commit: TAJO-32: Cleanup TaskRunner. (hyunsik)
Updated Branches:
refs/heads/master 58dc28233 -> 8c60ea8fb
TAJO-32: Cleanup TaskRunner. (hyunsik)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/8c60ea8f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/8c60ea8f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/8c60ea8f
Branch: refs/heads/master
Commit: 8c60ea8fbf67c72b261cac1543eda17c0d779248
Parents: 58dc282
Author: Hyunsik Choi <hy...@apache.org>
Authored: Fri Apr 19 19:40:16 2013 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Fri Apr 19 19:45:16 2013 +0900
----------------------------------------------------------------------
CHANGES.txt | 4 +-
.../src/main/java/tajo/worker/Task.java | 22 +-
.../src/main/java/tajo/worker/TaskRunner.java | 171 ++++++++-------
.../java/tajo/pullserver/PullServerAuxService.java | 9 +-
4 files changed, 113 insertions(+), 93 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8c60ea8f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 1b50ad6..256f224 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,8 @@ Release 0.2.0 - unreleased
IMPROVEMENTS
+ TAJO-32: Cleanup TaskRunner. (hyunsik)
+
TAJO-27: Modify the document links to point the wiki's ones. (hyunsik)
TAJO-17: Improve the examples for jvm version and auxiliary service in
@@ -37,4 +39,4 @@ Release 0.2.0 - unreleased
TAJO-6: Rename tajo.engine.function.builtin.NewSumInt to SumInt. (rsumbaly)
- TAJO-21: CREATE EXTERNAL TABLE should support a file path (JaeHwa Jung)
+ TAJO-21: CREATE EXTERNAL TABLE should support a file path. (JaeHwa Jung)
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8c60ea8f/tajo-core/tajo-core-backend/src/main/java/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/worker/Task.java b/tajo-core/tajo-core-backend/src/main/java/tajo/worker/Task.java
index 8b0df26..28ccbfe 100644
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/worker/Task.java
+++ b/tajo-core/tajo-core-backend/src/main/java/tajo/worker/Task.java
@@ -75,6 +75,7 @@ public class Task {
private final LocalDirAllocator lDirAllocator;
private final QueryUnitAttemptId taskId;
+ private final Path taskDir;
private final QueryUnitRequest request;
private final TaskAttemptContext context;
private List<Fetcher> fetcherRunners;
@@ -128,7 +129,7 @@ public class Task {
public Task(QueryUnitAttemptId taskId,
final WorkerContext worker, final Interface masterProxy,
- final QueryUnitRequest request, Path taskDir) throws IOException {
+ final QueryUnitRequest request) throws IOException {
this.request = request;
this.reporter = new Reporter(masterProxy);
this.reporter.startCommunicationThread();
@@ -139,6 +140,8 @@ public class Task {
this.masterProxy = masterProxy;
this.localFS = worker.getLocalFS();
this.lDirAllocator = worker.getLocalDirAllocator();
+ this.taskDir = StorageUtil.concatPath(workerContext.getBaseDir(),
+ taskId.getQueryUnitId().getId() + "_" + taskId.getId());
this.context = new TaskAttemptContext(conf, taskId,
request.getFragments().toArray(new Fragment[request.getFragments().size()]),
@@ -156,9 +159,11 @@ public class Task {
this.sortComp = new TupleComparator(finalSchema, sortNode.getSortKeys());
}
} else {
+ // The final result of a task will be written in a file named part-ss-nnnnnnn,
+ // where ss is the subquery id associated with this task, and nnnnnn is the task id.
Path outFilePath = new Path(conf.getOutputPath(),
- OUTPUT_FILE_FORMAT_SUBQUERY.get().format(taskId.getSubQueryId().getId()) +
OUTPUT_FILE_PREFIX +
+ OUTPUT_FILE_FORMAT_SUBQUERY.get().format(taskId.getSubQueryId().getId()) + "-" +
OUTPUT_FILE_FORMAT_TASK.get().format(taskId.getQueryUnitId().getId()));
LOG.info("Output File Path: " + outFilePath);
context.setOutputPath(outFilePath);
@@ -186,6 +191,9 @@ public class Task {
}
public void init() throws IOException {
+ // initialize a task temporal dir
+ localFS.mkdirs(taskDir);
+
if (request.getFetches().size() > 0) {
inputTableBaseDir = localFS.makeQualified(
lDirAllocator.getLocalPathForWrite(
@@ -328,16 +336,6 @@ public class Task {
Entry<Integer,String> entry = it.next();
Partition.Builder part = Partition.newBuilder();
part.setPartitionKey(entry.getKey());
- if (partitionType == PARTITION_TYPE.HASH) {
-// part.setFileName(
-// dataServerURL + "/?qid=" + getId().toString() + "&fn=" +
-// entry.getValue());
- } else if (partitionType == PARTITION_TYPE.LIST) {
-// part.setFileName(dataServerURL + "/?qid=" + getId().toString() +
-// "&fn=0");
- } else {
-// part.setFileName(dataServerURL + "/?qid=" + getId().toString());
- }
builder.addPartitions(part.build());
} while (it.hasNext());
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8c60ea8f/tajo-core/tajo-core-backend/src/main/java/tajo/worker/TaskRunner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/worker/TaskRunner.java b/tajo-core/tajo-core-backend/src/main/java/tajo/worker/TaskRunner.java
index 9e77164..ff5ca86 100644
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/worker/TaskRunner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/tajo/worker/TaskRunner.java
@@ -30,7 +30,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.hadoop.yarn.util.ConverterUtils;
import tajo.QueryConf;
@@ -56,8 +55,13 @@ import java.util.concurrent.*;
import static tajo.engine.MasterWorkerProtos.TaskFatalErrorReport;
+/**
+ * The driver class for Tajo QueryUnit processing.
+ */
public class TaskRunner extends AbstractService {
+ /** class logger */
private static final Log LOG = LogFactory.getLog(TaskRunner.class);
+
private QueryConf conf;
private volatile boolean stopped = false;
@@ -70,24 +74,35 @@ public class TaskRunner extends AbstractService {
// Cluster Management
private MasterWorkerProtocolService.Interface master;
- // Query Processing
+ // for temporal or intermediate files
private FileSystem localFS;
+ // for input files
private FileSystem defaultFS;
private TajoQueryEngine queryEngine;
+
+ // TODO - this should be configurable
private final int coreNum = 4;
+
+ // for Fetcher
private final ExecutorService fetchLauncher =
Executors.newFixedThreadPool(coreNum * 4);
+ // It keeps all of the query unit attempts while a TaskRunner is running.
private final Map<QueryUnitAttemptId, Task> tasks =
new ConcurrentHashMap<QueryUnitAttemptId, Task>();
private LocalDirAllocator lDirAllocator;
+ // A thread to receive each assigned query unit and execute the query unit
private Thread taskLauncher;
+ // Contains the object references related for TaskRunner
private WorkerContext workerContext;
+ // for the doAs block
private UserGroupInformation taskOwner;
+ // for the local temporal dir
private String baseDir;
+ private Path baseDirPath;
public TaskRunner(
final SubQueryId subQueryId,
@@ -110,15 +125,23 @@ public class TaskRunner extends AbstractService {
try {
this.workerContext = new WorkerContext();
- baseDir =
- ContainerLocalizer.USERCACHE + "/" + taskOwner.getShortUserName() + "/"
- + ContainerLocalizer.APPCACHE + "/"
- + ConverterUtils.toString(appId)
- + "/output" + "/" + subQueryId.getId();
+ // initialize DFS and LocalFileSystems
+ defaultFS = FileSystem.get(URI.create(conf.get("tajo.rootdir")),conf);
+ localFS = FileSystem.getLocal(conf);
- // Setup LocalDirAllocator
+ // the base dir for an output dir
+ baseDir = ConverterUtils.toString(appId)
+ + "/output" + "/" + subQueryId.getId();
+
+ // initialize LocalDirAllocator
lDirAllocator = new LocalDirAllocator(ConfVars.TASK_LOCAL_DIR.varname);
- LOG.info("Task LocalCache: " + baseDir);
+
+ baseDirPath = localFS.makeQualified(lDirAllocator.getLocalPathForWrite(baseDir, conf));
+ LOG.info("TaskRunner basedir is created (" + baseDir +")");
+
+ // Setup QueryEngine according to the query plan
+ // Here, we can setup row-based query engine or columnar query engine.
+ this.queryEngine = new TajoQueryEngine(conf);
Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownHook()));
} catch (Throwable t) {
@@ -130,24 +153,13 @@ public class TaskRunner extends AbstractService {
@Override
public void start() {
- try {
- // Setup DFS and LocalFileSystems
- defaultFS = FileSystem.get(URI.create(conf.get("tajo.rootdir")),conf);
- localFS = FileSystem.getLocal(conf);
-
- // Setup QueryEngine according to the query plan
- // Here, we can setup row-based query engine or columnar query engine.
- this.queryEngine = new TajoQueryEngine(conf);
- } catch (Throwable t) {
- LOG.error(t);
- }
-
run();
}
@Override
public void stop() {
if (!isStopped()) {
+ // If TaskRunner is stopped, all running or pending tasks will be marked as failed.
for (Task task : tasks.values()) {
if (task.getStatus() == TaskAttemptState.TA_PENDING ||
task.getStatus() == TaskAttemptState.TA_RUNNING) {
@@ -155,6 +167,7 @@ public class TaskRunner extends AbstractService {
}
}
+ // If this flag become true, taskLauncher will be terminated.
this.stopped = true;
LOG.info("STOPPED: " + nodeId);
@@ -204,10 +217,14 @@ public class TaskRunner extends AbstractService {
public ExecutorService getFetchLauncher() {
return fetchLauncher;
}
+
+ public Path getBaseDir() {
+ return baseDirPath;
+ }
}
static void fatalError(MasterWorkerProtocolService.Interface proxy,
- QueryUnitAttemptId taskAttemptId, String message) {
+ QueryUnitAttemptId taskAttemptId, String message) {
TaskFatalErrorReport.Builder builder = TaskFatalErrorReport.newBuilder()
.setId(taskAttemptId.getProto())
.setErrorMessage(message);
@@ -215,7 +232,7 @@ public class TaskRunner extends AbstractService {
}
public void run() {
- LOG.info("Tajo Worker startup");
+ LOG.info("TaskRunner startup");
try {
@@ -228,53 +245,55 @@ public class TaskRunner extends AbstractService {
while(!stopped) {
try {
- if (callFuture == null) {
- callFuture = new CallFuture2<QueryUnitRequestProto>();
- master.getTask(null, ((ContainerIdPBImpl) containerId).getProto(),
- callFuture);
- }
- try {
- taskRequest = callFuture.get(3, TimeUnit.SECONDS);
- } catch (TimeoutException te) {
- LOG.error(te);
- }
-
- if (taskRequest != null) {
- if (taskRequest.getShouldDie()) {
- LOG.info("received ShouldDie flag");
- stop();
-
- } else {
-
- LOG.info("Accumulated Received Task: " + (++receivedNum));
-
- QueryUnitAttemptId taskAttemptId = new QueryUnitAttemptId(taskRequest.getId());
- if (tasks.containsKey(taskAttemptId)) {
- fatalError(master, taskAttemptId, "Duplicate Task Attempt: " + taskAttemptId);
- continue;
- }
-
- Path taskTempDir = localFS.makeQualified(
- lDirAllocator.getLocalPathForWrite(baseDir +
- "/" + taskAttemptId.getQueryUnitId().getId()
- + "_" + taskAttemptId.getId(), conf));
-
- LOG.info("Initializing: " + taskAttemptId);
- Task task = new Task(taskAttemptId, workerContext, master,
- new QueryUnitRequestImpl(taskRequest), taskTempDir);
- tasks.put(taskAttemptId, task);
+ if (callFuture == null) {
+ callFuture = new CallFuture2<QueryUnitRequestProto>();
+ master.getTask(null, ((ContainerIdPBImpl) containerId).getProto(),
+ callFuture);
+ }
+ try {
+ // wait for an assigning task for 3 seconds
+ taskRequest = callFuture.get(3, TimeUnit.SECONDS);
+ } catch (TimeoutException te) {
+ // if there has been no assigning task for a given period,
+ // TaskRunner will retry to request an assigning task.
+ LOG.error(te);
+ continue;
+ }
+
+ if (taskRequest != null) {
+ // QueryMaster can send the terminal signal to TaskRunner.
+ // If TaskRunner receives the terminal signal, TaskRunner will be terminated
+ // immediately.
+ if (taskRequest.getShouldDie()) {
+ LOG.info("received ShouldDie flag");
+ stop();
+
+ } else {
+
+ LOG.info("Accumulated Received Task: " + (++receivedNum));
+
+ QueryUnitAttemptId taskAttemptId = new QueryUnitAttemptId(taskRequest.getId());
+ if (tasks.containsKey(taskAttemptId)) {
+ fatalError(master, taskAttemptId, "Duplicate Task Attempt: " + taskAttemptId);
+ continue;
+ }
- task.init();
- if (task.hasFetchPhase()) {
- task.fetch(); // The fetch is performed in an asynchronous way.
- }
- // task.run() is a blocking call.
- task.run();
+ LOG.info("Initializing: " + taskAttemptId);
+ Task task = new Task(taskAttemptId, workerContext, master,
+ new QueryUnitRequestImpl(taskRequest));
+ tasks.put(taskAttemptId, task);
- callFuture = null;
- taskRequest = null;
+ task.init();
+ if (task.hasFetchPhase()) {
+ task.fetch(); // The fetch is performed in an asynchronous way.
}
+ // task.run() is a blocking call.
+ task.run();
+
+ callFuture = null;
+ taskRequest = null;
}
+ }
} catch (Throwable t) {
LOG.error(t);
}
@@ -311,10 +330,14 @@ public class TaskRunner extends AbstractService {
}
/**
- * 1st Arg: TaskRunnerListener hostname
- * 2nd Arg: TaskRunnerListener port
- * 3nd Arg: SubQueryId
- * 4th Arg: NodeId
+ * TaskRunner takes 5 arguments as follows:
+ * <ol>
+ * <li>1st: TaskRunnerListener hostname</li>
+ * <li>2nd: TaskRunnerListener port</li>
+ * <li>3nd: SubQueryId</li>
+ * <li>4th: NodeId</li>
+ * <li>5th: ContainerId</li>
+ * </ol>
*/
public static void main(String[] args) throws Exception {
// Restore QueryConf
@@ -333,13 +356,13 @@ public class TaskRunner extends AbstractService {
final InetSocketAddress masterAddr =
NetUtils.createSocketAddrForHost(host, port);
- // SubQueryId
+ // SubQueryId from String
final SubQueryId subQueryId = TajoIdUtils.newSubQueryId(args[2]);
- // NodeId for itself
+ // NodeId has a form of hostname:port.
NodeId nodeId = ConverterUtils.toNodeId(args[3]);
ContainerId containerId = ConverterUtils.toContainerId(args[4]);
- // TODO - load credential
+ // TODO - 'load credential' should be implemented
// Getting taskOwner
UserGroupInformation taskOwner =
UserGroupInformation.createRemoteUser(conf.getVar(ConfVars.QUERY_USERNAME));
@@ -349,7 +372,7 @@ public class TaskRunner extends AbstractService {
ProtoAsyncRpcClient client;
MasterWorkerProtocolService.Interface master;
- // Create MasterWorkerProtocol as actual task owner.
+ // initialize MasterWorkerProtocol as an actual task owner.
client =
taskOwner.doAs(new PrivilegedExceptionAction<ProtoAsyncRpcClient>() {
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8c60ea8f/tajo-core/tajo-core-pullserver/src/main/java/tajo/pullserver/PullServerAuxService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-pullserver/src/main/java/tajo/pullserver/PullServerAuxService.java b/tajo-core/tajo-core-pullserver/src/main/java/tajo/pullserver/PullServerAuxService.java
index 607662d..c98e70c 100644
--- a/tajo-core/tajo-core-pullserver/src/main/java/tajo/pullserver/PullServerAuxService.java
+++ b/tajo-core/tajo-core-pullserver/src/main/java/tajo/pullserver/PullServerAuxService.java
@@ -385,16 +385,13 @@ public class PullServerAuxService extends AbstractService
List<String> taskIds = splitMaps(taskIdList);
// the working dir of tajo worker for each query
- String base =
- ContainerLocalizer.USERCACHE + "/" + userName + "/"
- + ContainerLocalizer.APPCACHE + "/"
- + appId + "/output" + "/";
+ String queryBaseDir = appId + "/output" + "/";
// if a subquery requires a range partitioning
if (repartitionType.equals("r")) {
String ta = taskIds.get(0);
Path path = localFS.makeQualified(
- lDirAlloc.getLocalPathToRead(base + "/" + sid + "/"
+ lDirAlloc.getLocalPathToRead(queryBaseDir + "/" + sid + "/"
+ ta + "/output/", conf));
String startKey = params.get("start").get(0);
@@ -417,7 +414,7 @@ public class PullServerAuxService extends AbstractService
} else if (repartitionType.equals("h")) {
for (String ta : taskIds) {
Path path = localFS.makeQualified(
- lDirAlloc.getLocalPathToRead(base + "/" + sid + "/" +
+ lDirAlloc.getLocalPathToRead(queryBaseDir + "/" + sid + "/" +
ta + "/output/" + partitionId, conf));
File file = new File(path.toUri());
FileChunk chunk = new FileChunk(file, 0, file.length());