You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/04/02 19:17:44 UTC
git commit: TAJO-15: The Integration test is getting hanged on Mac OS
X. (hyunsik)
Updated Branches:
refs/heads/master c2362cf3b -> fe5b030c5
TAJO-15: The Integration test is getting hanged on Mac OS X. (hyunsik)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/fe5b030c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/fe5b030c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/fe5b030c
Branch: refs/heads/master
Commit: fe5b030c53f9babf11370cf3e483898fa2fe3063
Parents: c2362cf
Author: Hyunsik Choi <hy...@apache.org>
Authored: Wed Apr 3 02:09:19 2013 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Wed Apr 3 02:14:33 2013 +0900
----------------------------------------------------------------------
CHANGES.txt | 10 +-
.../src/main/java/tajo/worker/TaskRunner.java | 98 ++++-----------
.../src/test/java/tajo/worker/TaskRunnerTest.java | 3 +-
3 files changed, 32 insertions(+), 79 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fe5b030c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index da3a5a2..d35db48 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,7 +9,7 @@ Release 0.2.0 - unreleased
TAJO-17: Improve the examples for jvm version and auxiliary service in
getting_started.apt. (hyunsik)
- TAJO-10: Modify git ignore to include Apache derby log file
+ TAJO-10: Modify git ignore to include Apache derby log file. (hsaputra)
TAJO-12: Add information in README on how to subscribe to mailing
lists (mattmann, hyunsik)
@@ -22,9 +22,13 @@ Release 0.2.0 - unreleased
BUG FIXES
- TAJO-8: MapReduce's ShuffleHandler and Tajo's PullServerAuxService conflict each other
+ TAJO-15: The Integration test is getting hanged on Mac OS X. (hyunsik)
- TAJO-13: Update the groupId property in the pom.xml to use ASF org instead of kr.ac.korea.dbserver
+ TAJO-8: MapReduce's ShuffleHandler and Tajo's PullServerAuxService conflict
+ each other. (jhkim)
+
+ TAJO-13: Update the groupId property in the pom.xml to use ASF org instead
+ of kr.ac.korea.dbserver (hsaputra)
TAJO-1: RCFileWrapper always reads whole columns regardless of the target
schema. (jihoonson via hyunsik)
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fe5b030c/tajo-core/tajo-core-backend/src/main/java/tajo/worker/TaskRunner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/worker/TaskRunner.java b/tajo-core/tajo-core-backend/src/main/java/tajo/worker/TaskRunner.java
index cf0dbc4..9e77164 100644
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/worker/TaskRunner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/tajo/worker/TaskRunner.java
@@ -18,7 +18,6 @@
package tajo.worker;
-import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -39,7 +38,6 @@ import tajo.QueryUnitAttemptId;
import tajo.SubQueryId;
import tajo.TajoProtos.TaskAttemptState;
import tajo.conf.TajoConf.ConfVars;
-import tajo.engine.MasterWorkerProtos;
import tajo.engine.MasterWorkerProtos.QueryUnitRequestProto;
import tajo.engine.query.QueryUnitRequestImpl;
import tajo.ipc.MasterWorkerProtocol;
@@ -50,19 +48,19 @@ import tajo.rpc.NullCallback;
import tajo.rpc.ProtoAsyncRpcClient;
import tajo.util.TajoIdUtils;
-import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.security.PrivilegedExceptionAction;
import java.util.Map;
import java.util.concurrent.*;
+import static tajo.engine.MasterWorkerProtos.TaskFatalErrorReport;
+
public class TaskRunner extends AbstractService {
private static final Log LOG = LogFactory.getLog(TaskRunner.class);
private QueryConf conf;
private volatile boolean stopped = false;
- private volatile boolean isOnline = false;
private final SubQueryId subQueryId;
private ApplicationId appId;
@@ -70,7 +68,6 @@ public class TaskRunner extends AbstractService {
private final ContainerId containerId;
// Cluster Management
- private ProtoAsyncRpcClient client;
private MasterWorkerProtocolService.Interface master;
// Query Processing
@@ -78,7 +75,6 @@ public class TaskRunner extends AbstractService {
private FileSystem defaultFS;
private TajoQueryEngine queryEngine;
- private QueryLauncher queryLauncher;
private final int coreNum = 4;
private final ExecutorService fetchLauncher =
Executors.newFixedThreadPool(coreNum * 4);
@@ -97,14 +93,12 @@ public class TaskRunner extends AbstractService {
final SubQueryId subQueryId,
final NodeId nodeId,
UserGroupInformation taskOwner,
- ProtoAsyncRpcClient client,
Interface master, ContainerId containerId) {
super(TaskRunner.class.getName());
this.subQueryId = subQueryId;
this.appId = subQueryId.getQueryId().getApplicationId();
this.nodeId = nodeId;
this.taskOwner = taskOwner;
- this.client = client;
this.master = master;
this.containerId = containerId;
}
@@ -143,7 +137,6 @@ public class TaskRunner extends AbstractService {
// Setup QueryEngine according to the query plan
// Here, we can setup row-based query engine or columnar query engine.
- this.queryLauncher = new QueryLauncher();
this.queryEngine = new TajoQueryEngine(conf);
} catch (Throwable t) {
LOG.error(t);
@@ -168,8 +161,6 @@ public class TaskRunner extends AbstractService {
synchronized (this) {
notifyAll();
}
-
- client.close();
}
}
@@ -215,6 +206,14 @@ public class TaskRunner extends AbstractService {
}
}
+ static void fatalError(MasterWorkerProtocolService.Interface proxy,
+ QueryUnitAttemptId taskAttemptId, String message) {
+ TaskFatalErrorReport.Builder builder = TaskFatalErrorReport.newBuilder()
+ .setId(taskAttemptId.getProto())
+ .setErrorMessage(message);
+ proxy.fatalError(null, builder.build(), NullCallback.get());
+ }
+
public void run() {
LOG.info("Tajo Worker startup");
@@ -229,12 +228,6 @@ public class TaskRunner extends AbstractService {
while(!stopped) {
try {
-
- while(!stopped && !queryLauncher.hasAvailableSlot()) {
- Thread.sleep(1000);
- }
-
- if (!stopped) {
if (callFuture == null) {
callFuture = new CallFuture2<QueryUnitRequestProto>();
master.getTask(null, ((ContainerIdPBImpl) containerId).getProto(),
@@ -250,38 +243,38 @@ public class TaskRunner extends AbstractService {
if (taskRequest.getShouldDie()) {
LOG.info("received ShouldDie flag");
stop();
+
} else {
+
LOG.info("Accumulated Received Task: " + (++receivedNum));
- QueryUnitAttemptId taskAttemptId =
- new QueryUnitAttemptId(taskRequest.getId());
+
+ QueryUnitAttemptId taskAttemptId = new QueryUnitAttemptId(taskRequest.getId());
if (tasks.containsKey(taskAttemptId)) {
- MasterWorkerProtos.TaskFatalErrorReport.Builder builder =
- MasterWorkerProtos.TaskFatalErrorReport.newBuilder()
- .setErrorMessage("Duplicate Task Attempt: " +
- taskAttemptId);
- master.fatalError(null, builder.build(), NullCallback.get());
+ fatalError(master, taskAttemptId, "Duplicate Task Attempt: " + taskAttemptId);
continue;
}
+
Path taskTempDir = localFS.makeQualified(
lDirAllocator.getLocalPathForWrite(baseDir +
"/" + taskAttemptId.getQueryUnitId().getId()
+ "_" + taskAttemptId.getId(), conf));
+
LOG.info("Initializing: " + taskAttemptId);
Task task = new Task(taskAttemptId, workerContext, master,
new QueryUnitRequestImpl(taskRequest), taskTempDir);
tasks.put(taskAttemptId, task);
+
task.init();
if (task.hasFetchPhase()) {
task.fetch(); // The fetch is performed in an asynchronous way.
}
-
+ // task.run() is a blocking call.
task.run();
callFuture = null;
taskRequest = null;
}
}
- }
} catch (Throwable t) {
LOG.error(t);
}
@@ -299,11 +292,7 @@ public class TaskRunner extends AbstractService {
t.abort();
}
}
-
- client.close();
}
-
- LOG.info("TaskRunner (" + nodeId + ") main thread exiting");
}
private class ShutdownHook implements Runnable {
@@ -314,10 +303,6 @@ public class TaskRunner extends AbstractService {
}
}
- public String getServerName() {
- return nodeId.toString();
- }
-
/**
* @return true if a stop has been requested.
*/
@@ -325,42 +310,6 @@ public class TaskRunner extends AbstractService {
return this.stopped;
}
- public boolean isOnline() {
- return this.isOnline;
- }
-
- public void shutdown(final String msg) {
-
- }
-
- @VisibleForTesting
- Task getTask(QueryUnitAttemptId id) {
- return this.tasks.get(id);
- }
-
- private class QueryLauncher {
- private final ThreadPoolExecutor executor
- = new ThreadPoolExecutor(coreNum, coreNum * 4, 0L, TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue<Runnable>(coreNum * 4));
- private boolean stopped = false;
-
- public void schedule(Task task) throws InterruptedException {
-
- }
-
- public boolean hasAvailableSlot() {
- return executor.getQueue().size() < coreNum;
- }
- }
-
- public Path getTaskTempDir(QueryUnitAttemptId taskAttemptId)
- throws IOException {
- return lDirAllocator.
- getLocalPathToRead(baseDir + "/" + taskAttemptId.getId(),
- conf);
- }
-
-
/**
* 1st Arg: TaskRunnerListener hostname
* 2nd Arg: TaskRunnerListener port
@@ -368,8 +317,6 @@ public class TaskRunner extends AbstractService {
* 4th Arg: NodeId
*/
public static void main(String[] args) throws Exception {
- LOG.info(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>");
- System.out.println(System.getenv("CLASSPATH"));
// Restore QueryConf
final QueryConf conf = new QueryConf();
conf.addResource(new Path(QueryConf.FILENAME));
@@ -402,7 +349,7 @@ public class TaskRunner extends AbstractService {
ProtoAsyncRpcClient client;
MasterWorkerProtocolService.Interface master;
- // Create TaskUmbilicalProtocol as actual task owner.
+ // Create MasterWorkerProtocol as actual task owner.
client =
taskOwner.doAs(new PrivilegedExceptionAction<ProtoAsyncRpcClient>() {
@Override
@@ -413,8 +360,11 @@ public class TaskRunner extends AbstractService {
master = client.getStub();
- TaskRunner taskRunner = new TaskRunner(subQueryId, nodeId, taskOwner, client, master, containerId);
+ TaskRunner taskRunner = new TaskRunner(subQueryId, nodeId, taskOwner, master, containerId);
taskRunner.init(conf);
taskRunner.start();
+ client.close();
+ LOG.info("TaskRunner (" + nodeId + ") main thread exiting");
+ System.exit(0);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/fe5b030c/tajo-core/tajo-core-backend/src/test/java/tajo/worker/TaskRunnerTest.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/tajo/worker/TaskRunnerTest.java b/tajo-core/tajo-core-backend/src/test/java/tajo/worker/TaskRunnerTest.java
index 131e797..7d082bd 100644
--- a/tajo-core/tajo-core-backend/src/test/java/tajo/worker/TaskRunnerTest.java
+++ b/tajo-core/tajo-core-backend/src/test/java/tajo/worker/TaskRunnerTest.java
@@ -58,8 +58,7 @@ public class TaskRunnerTest {
nodeId.setPort(9001);
UserGroupInformation mockTaskOwner = mock(UserGroupInformation.class);
when(mockTaskOwner.getShortUserName()).thenReturn("hyunsik");
- TaskRunner runner = new TaskRunner(sq1, nodeId, mockTaskOwner, mockClient,
- mockMaster, cId);
+ TaskRunner runner = new TaskRunner(sq1, nodeId, mockTaskOwner, mockMaster, cId);
QueryConf conf = new QueryConf();
conf.setOutputPath(new Path("/tmp/" + q1));
runner.init(conf);