You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/08/26 14:29:11 UTC
[2/8] TAJO-127: Implement Tajo Resource Manager. (hyoungjunkim via
hyunsik)
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
index a41b280..47ec7bc 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
@@ -26,20 +26,20 @@ import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.QueryConf;
+import org.apache.tajo.QueryId;
import org.apache.tajo.QueryUnitAttemptId;
-import org.apache.tajo.SubQueryId;
import org.apache.tajo.TajoProtos.TaskAttemptState;
import org.apache.tajo.conf.TajoConf.ConfVars;
import org.apache.tajo.engine.query.QueryUnitRequestImpl;
-import org.apache.tajo.ipc.QueryMasterProtocol;
-import org.apache.tajo.ipc.QueryMasterProtocol.QueryMasterProtocolService.*;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.ipc.TajoWorkerProtocol.TajoWorkerProtocolService.Interface;
import org.apache.tajo.rpc.CallFuture2;
import org.apache.tajo.rpc.NullCallback;
import org.apache.tajo.rpc.ProtoAsyncRpcClient;
@@ -51,7 +51,7 @@ import java.security.PrivilegedExceptionAction;
import java.util.Map;
import java.util.concurrent.*;
-import static org.apache.tajo.ipc.QueryMasterProtocol.*;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.*;
/**
* The driver class for Tajo QueryUnit processing.
@@ -60,17 +60,17 @@ public class TaskRunner extends AbstractService {
/** class logger */
private static final Log LOG = LogFactory.getLog(TaskRunner.class);
- private QueryConf conf;
+ private QueryConf queryConf;
private volatile boolean stopped = false;
- private final SubQueryId subQueryId;
- private ApplicationId appId;
- private final NodeId nodeId;
- private final ContainerId containerId;
+ private ExecutionBlockId executionBlockId;
+ private QueryId queryId;
+ private NodeId nodeId;
+ private ContainerId containerId;
// Cluster Management
- private QueryMasterProtocol.QueryMasterProtocolService.Interface master;
+ private TajoWorkerProtocol.TajoWorkerProtocolService.Interface master;
// for temporal or intermediate files
private FileSystem localFS;
@@ -94,7 +94,7 @@ public class TaskRunner extends AbstractService {
private Thread taskLauncher;
// Contains the object references related for TaskRunner
- private WorkerContext workerContext;
+ private TaskRunnerContext taskRunnerContext;
// for the doAs block
private UserGroupInformation taskOwner;
@@ -102,34 +102,89 @@ public class TaskRunner extends AbstractService {
private String baseDir;
private Path baseDirPath;
+ private ProtoAsyncRpcClient client;
+
+ private TaskRunnerManager taskRunnerManager;
+
public TaskRunner(
- final SubQueryId subQueryId,
+ final ExecutionBlockId executionBlockId,
final NodeId nodeId,
UserGroupInformation taskOwner,
Interface master, ContainerId containerId) {
super(TaskRunner.class.getName());
- this.subQueryId = subQueryId;
- this.appId = subQueryId.getQueryId().getApplicationId();
+ this.executionBlockId = executionBlockId;
+ this.queryId = executionBlockId.getQueryId();
this.nodeId = nodeId;
this.taskOwner = taskOwner;
this.master = master;
this.containerId = containerId;
}
- @Override
- public void init(Configuration _conf) {
- this.conf = (QueryConf) _conf;
+ public TaskRunner(TaskRunnerManager taskRunnerManager, QueryConf conf, String[] args) {
+ super(TaskRunner.class.getName());
+ this.taskRunnerManager = taskRunnerManager;
try {
- this.workerContext = new WorkerContext();
+ final ExecutionBlockId executionBlockId = TajoIdUtils.createExecutionBlockId(args[1]);
+
+ conf.setOutputPath(new Path(args[6]));
+
+ LOG.info("NM Local Dir: " + conf.get(ConfVars.TASK_LOCAL_DIR.varname));
+ LOG.info("OUTPUT DIR: " + conf.getOutputPath());
+ LOG.info("Tajo Root Dir: " + conf.getVar(ConfVars.ROOT_DIR));
+
+ UserGroupInformation.setConfiguration(conf);
+
+ // QueryBlockId from String
+ // NodeId has a form of hostname:port.
+ NodeId nodeId = ConverterUtils.toNodeId(args[2]);
+ this.containerId = ConverterUtils.toContainerId(args[3]);
+
+ // QueryMaster's address
+ String host = args[4];
+ int port = Integer.parseInt(args[5]);
+ final InetSocketAddress masterAddr = NetUtils.createSocketAddrForHost(host, port);
+
+ LOG.info("QueryMaster Address:" + masterAddr);
+ // TODO - 'load credential' should be implemented
+ // Getting taskOwner
+ UserGroupInformation taskOwner =
+ UserGroupInformation.createRemoteUser(conf.getVar(ConfVars.QUERY_USERNAME));
+ //taskOwner.addToken(token);
+
+ // initialize MasterWorkerProtocol as an actual task owner.
+ this.client =
+ taskOwner.doAs(new PrivilegedExceptionAction<ProtoAsyncRpcClient>() {
+ @Override
+ public ProtoAsyncRpcClient run() throws Exception {
+ return new ProtoAsyncRpcClient(TajoWorkerProtocol.class, masterAddr);
+ }
+ });
+ this.master = client.getStub();
+
+ this.executionBlockId = executionBlockId;
+ this.queryId = executionBlockId.getQueryId();
+ this.nodeId = nodeId;
+ this.taskOwner = taskOwner;
+
+ this.taskRunnerContext = new TaskRunnerContext();
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public void init(Configuration conf) {
+ this.queryConf = (QueryConf)conf;
+ try {
// initialize DFS and LocalFileSystems
- defaultFS = FileSystem.get(URI.create(conf.getVar(ConfVars.ROOT_DIR)),conf);
+ defaultFS = FileSystem.get(URI.create(queryConf.getVar(ConfVars.ROOT_DIR)),conf);
localFS = FileSystem.getLocal(conf);
// the base dir for an output dir
- baseDir = ConverterUtils.toString(appId)
- + "/output" + "/" + subQueryId.getId();
+ baseDir = queryId.toString()
+ + "/output" + "/" + executionBlockId.getId();
// initialize LocalDirAllocator
lDirAllocator = new LocalDirAllocator(ConfVars.TASK_LOCAL_DIR.varname);
@@ -139,9 +194,7 @@ public class TaskRunner extends AbstractService {
// Setup QueryEngine according to the query plan
// Here, we can setup row-based query engine or columnar query engine.
- this.queryEngine = new TajoQueryEngine(conf);
-
- Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownHook()));
+ this.queryEngine = new TajoQueryEngine(queryConf);
} catch (Throwable t) {
LOG.error(t);
}
@@ -152,39 +205,46 @@ public class TaskRunner extends AbstractService {
@Override
public void start() {
run();
+ super.start();
}
@Override
public void stop() {
- if (!isStopped()) {
- // If TaskRunner is stopped, all running or pending tasks will be marked as failed.
- for (Task task : tasks.values()) {
- if (task.getStatus() == TaskAttemptState.TA_PENDING ||
- task.getStatus() == TaskAttemptState.TA_RUNNING) {
- task.setState(TaskAttemptState.TA_FAILED);
- }
+ if(isStopped()) {
+ return;
+ }
+ // If this flag become true, taskLauncher will be terminated.
+ this.stopped = true;
+
+ // If TaskRunner is stopped, all running or pending tasks will be marked as failed.
+ for (Task task : tasks.values()) {
+ if (task.getStatus() == TaskAttemptState.TA_PENDING ||
+ task.getStatus() == TaskAttemptState.TA_RUNNING) {
+ task.setState(TaskAttemptState.TA_FAILED);
}
+ }
- // If this flag become true, taskLauncher will be terminated.
- this.stopped = true;
+ if(client != null) {
+ client.close();
+ client = null;
+ }
- LOG.info("STOPPED: " + nodeId);
- synchronized (this) {
- notifyAll();
- }
+ LOG.info("Stop TaskRunner: " + executionBlockId);
+ synchronized (this) {
+ notifyAll();
}
}
- class WorkerContext {
- public QueryConf getConf() {
- return conf;
+ public class TaskRunnerContext {
+ public QueryConf getQueryConf() {
+ return queryConf;
}
public String getNodeId() {
return nodeId.toString();
}
- public QueryMasterProtocolService.Interface getMaster() {
+ public TajoWorkerProtocolService.Interface getMaster() {
return master;
}
@@ -219,9 +279,17 @@ public class TaskRunner extends AbstractService {
public Path getBaseDir() {
return baseDirPath;
}
+
+ public ExecutionBlockId getExecutionBlockId() {
+ return executionBlockId;
+ }
+ }
+
+ public TaskRunnerContext getContext() {
+ return taskRunnerContext;
}
- static void fatalError(QueryMasterProtocolService.Interface proxy,
+ static void fatalError(TajoWorkerProtocolService.Interface proxy,
QueryUnitAttemptId taskAttemptId, String message) {
TaskFatalErrorReport.Builder builder = TaskFatalErrorReport.newBuilder()
.setId(taskAttemptId.getProto())
@@ -245,17 +313,27 @@ public class TaskRunner extends AbstractService {
try {
if (callFuture == null) {
callFuture = new CallFuture2<QueryUnitRequestProto>();
- master.getTask(null, ((ContainerIdPBImpl) containerId).getProto(),
- callFuture);
+ LOG.info("====>Request GetTask:" + executionBlockId + "," + containerId);
+ GetTaskRequestProto request = GetTaskRequestProto.newBuilder()
+ .setExecutionBlockId(executionBlockId.getProto())
+ .setContainerId(((ContainerIdPBImpl) containerId).getProto())
+ .build();
+ master.getTask(null, request, callFuture);
}
try {
// wait for an assigning task for 3 seconds
taskRequest = callFuture.get(3, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ if(stopped) {
+ break;
+ }
} catch (TimeoutException te) {
+ if(stopped) {
+ break;
+ }
// if there has been no assigning task for a given period,
// TaskRunner will retry to request an assigning task.
- LOG.error(te);
-
+ LOG.warn("Timeout getResource:" + executionBlockId + ", but retry", te);
continue;
}
@@ -264,9 +342,12 @@ public class TaskRunner extends AbstractService {
// If TaskRunner receives the terminal signal, TaskRunner will be terminated
// immediately.
if (taskRequest.getShouldDie()) {
- LOG.info("received ShouldDie flag");
+ LOG.info("Received ShouldDie flag:" + executionBlockId);
stop();
-
+ if(taskRunnerManager != null) {
+ //notify to TaskRunnerManager
+ taskRunnerManager.stopTask(executionBlockId);
+ }
} else {
LOG.info("Accumulated Received Task: " + (++receivedNum));
@@ -280,7 +361,7 @@ public class TaskRunner extends AbstractService {
LOG.info("Initializing: " + taskAttemptId);
Task task;
try {
- task = new Task(taskAttemptId, workerContext, master,
+ task = new Task(taskAttemptId, taskRunnerContext, master,
new QueryUnitRequestImpl(taskRequest));
tasks.put(taskAttemptId, task);
@@ -291,7 +372,7 @@ public class TaskRunner extends AbstractService {
// task.run() is a blocking call.
task.run();
} catch (Throwable t) {
- fatalError(workerContext.getMaster(), taskAttemptId, t.getMessage());
+ fatalError(taskRunnerContext.getMaster(), taskAttemptId, t.getMessage());
} finally {
callFuture = null;
taskRequest = null;
@@ -318,14 +399,6 @@ public class TaskRunner extends AbstractService {
}
}
- private class ShutdownHook implements Runnable {
- @Override
- public void run() {
- LOG.info("received SIGINT Signal");
- stop();
- }
- }
-
/**
* @return true if a stop has been requested.
*/
@@ -333,68 +406,7 @@ public class TaskRunner extends AbstractService {
return this.stopped;
}
- /**
- * TaskRunner takes 5 arguments as follows:
- * <ol>
- * <li>1st: SubQueryId</li>
- * <li>2nd: NodeId</li>
- * <li>3nd: ContainerId</li>
- * <li>4th: QueryMaster hostname</li>
- * <li>5th: QueryMaster port</li>
- * </ol>
- */
- public static void main(String[] args) throws Exception {
- // Restore QueryConf
- final QueryConf conf = new QueryConf();
- conf.addResource(new Path(QueryConf.FILENAME));
-
- LOG.info("MiniTajoYarn NM Local Dir: " + conf.get(ConfVars.TASK_LOCAL_DIR.varname));
- LOG.info("OUTPUT DIR: " + conf.getOutputPath());
- LOG.info("Tajo Root Dir: " + conf.getVar(ConfVars.ROOT_DIR));
-
- UserGroupInformation.setConfiguration(conf);
-
- // SubQueryId from String
- final SubQueryId subQueryId = TajoIdUtils.newSubQueryId(args[0]);
- // NodeId has a form of hostname:port.
- NodeId nodeId = ConverterUtils.toNodeId(args[1]);
- ContainerId containerId = ConverterUtils.toContainerId(args[2]);
-
- // QueryMaster's address
- String host = args[3];
- int port = Integer.parseInt(args[4]);
- final InetSocketAddress masterAddr =
- NetUtils.createSocketAddrForHost(host, port);
-
- // TODO - 'load credential' should be implemented
- // Getting taskOwner
- UserGroupInformation taskOwner =
- UserGroupInformation.createRemoteUser(conf.getVar(ConfVars.QUERY_USERNAME));
- //taskOwner.addToken(token);
-
- // QueryMasterService RPC
- ProtoAsyncRpcClient client;
- QueryMasterProtocolService.Interface master;
-
- // initialize MasterWorkerProtocol as an actual task owner.
- client =
- taskOwner.doAs(new PrivilegedExceptionAction<ProtoAsyncRpcClient>() {
- @Override
- public ProtoAsyncRpcClient run() throws Exception {
- return new ProtoAsyncRpcClient(QueryMasterProtocol.class, masterAddr);
- }
- });
- master = client.getStub();
-
-
- TaskRunner taskRunner = new TaskRunner(subQueryId, nodeId, taskOwner, master, containerId);
- try {
- taskRunner.init(conf);
- taskRunner.start();
- } finally {
- client.close();
- LOG.info("TaskRunner (" + nodeId + ") main thread exiting");
- System.exit(0);
- }
+ public ExecutionBlockId getExecutionBlockId() {
+ return this.executionBlockId;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
new file mode 100644
index 0000000..dcd44df
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.service.CompositeService;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.QueryConf;
+import org.apache.tajo.conf.TajoConf;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class TaskRunnerManager extends CompositeService {
+ private static final Log LOG = LogFactory.getLog(TaskRunnerManager.class);
+
+ private Map<ExecutionBlockId, TaskRunner> taskRunnerMap = new HashMap<ExecutionBlockId, TaskRunner>();
+ private TajoWorker.WorkerContext workerContext;
+ private TajoConf tajoConf;
+ private AtomicBoolean stop = new AtomicBoolean(false);
+
+ public TaskRunnerManager(TajoWorker.WorkerContext workerContext) {
+ super(TaskRunnerManager.class.getName());
+
+ this.workerContext = workerContext;
+ }
+
+ @Override
+ public void init(Configuration conf) {
+ tajoConf = (TajoConf)conf;
+ super.init(tajoConf);
+ }
+
+ @Override
+ public void start() {
+ super.start();
+ }
+
+ @Override
+ public void stop() {
+ if(stop.get()) {
+ return;
+ }
+ stop.set(true);
+ synchronized(taskRunnerMap) {
+ for(TaskRunner eachTaskRunner: taskRunnerMap.values()) {
+ if(!eachTaskRunner.isStopped()) {
+ eachTaskRunner.stop();
+ }
+ }
+ }
+ super.stop();
+ if(!workerContext.isStandbyMode()) {
+ workerContext.stopWorker(true);
+ }
+ }
+
+ public void stopTask(ExecutionBlockId executionBlockId) {
+ LOG.info("Stop Task:" + executionBlockId);
+ synchronized(taskRunnerMap) {
+ taskRunnerMap.remove(executionBlockId);
+ }
+ if(!workerContext.isStandbyMode()) {
+ stop();
+ }
+ }
+
+ public void startTask(final String[] params) {
+ //TODO change to use event dispatcher
+ Thread t = new Thread() {
+ public void run() {
+ try {
+ QueryConf queryConf = new QueryConf(tajoConf);
+ TaskRunner taskRunner = new TaskRunner(TaskRunnerManager.this, queryConf, params);
+ synchronized(taskRunnerMap) {
+ taskRunnerMap.put(taskRunner.getContext().getExecutionBlockId(), taskRunner);
+ }
+ taskRunner.init(queryConf);
+ taskRunner.start();
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ throw new RuntimeException(e.getMessage(), e);
+ }
+ }
+ };
+
+ t.start();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/YarnResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/YarnResourceAllocator.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/YarnResourceAllocator.java
new file mode 100644
index 0000000..9470a88
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/YarnResourceAllocator.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
+import org.apache.hadoop.yarn.client.YarnClient;
+import org.apache.hadoop.yarn.client.YarnClientImpl;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.proto.YarnProtos;
+import org.apache.tajo.QueryConf;
+import org.apache.tajo.master.TaskRunnerGroupEvent;
+import org.apache.tajo.master.TaskRunnerLauncher;
+import org.apache.tajo.master.YarnTaskRunnerLauncherImpl;
+import org.apache.tajo.master.event.ContainerAllocatorEventType;
+import org.apache.tajo.master.querymaster.QueryMasterTask;
+import org.apache.tajo.master.rm.YarnRMContainerAllocator;
+
+public class YarnResourceAllocator extends AbstractResourceAllocator {
+ private YarnRMContainerAllocator rmAllocator;
+
+ private TaskRunnerLauncher taskRunnerLauncher;
+
+ private YarnRPC yarnRPC;
+
+ private YarnClient yarnClient;
+
+ private static final Log LOG = LogFactory.getLog(YarnResourceAllocator.class.getName());
+
+ private QueryMasterTask.QueryContext queryContext;
+
+ private QueryConf queryConf;
+
+ public YarnResourceAllocator(QueryMasterTask.QueryContext queryContext) {
+ this.queryContext = queryContext;
+ }
+
+ @Override
+ public ContainerId makeContainerId(YarnProtos.ContainerIdProto containerId) {
+ return new ContainerIdPBImpl(containerId);
+ }
+
+ @Override
+ public void allocateTaskWorker() {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ @Override
+ public void init(Configuration conf) {
+ queryConf = (QueryConf)conf;
+
+ yarnRPC = YarnRPC.create(queryConf);
+
+ connectYarnClient();
+
+ taskRunnerLauncher = new YarnTaskRunnerLauncherImpl(queryContext, yarnRPC);
+ addService((org.apache.hadoop.yarn.service.Service) taskRunnerLauncher);
+ queryContext.getDispatcher().register(TaskRunnerGroupEvent.EventType.class, taskRunnerLauncher);
+
+ rmAllocator = new YarnRMContainerAllocator(queryContext);
+ addService(rmAllocator);
+ queryContext.getDispatcher().register(ContainerAllocatorEventType.class, rmAllocator);
+ super.init(conf);
+ }
+
+ @Override
+ public void stop() {
+ try {
+ this.yarnClient.stop();
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ }
+ super.stop();
+ }
+
+ @Override
+ public void start() {
+ super.start();
+ }
+
+ private void connectYarnClient() {
+ this.yarnClient = new YarnClientImpl();
+ this.yarnClient.init(queryConf);
+ this.yarnClient.start();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/dataserver/retriever/AdvancedDataRetriever.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/dataserver/retriever/AdvancedDataRetriever.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/dataserver/retriever/AdvancedDataRetriever.java
index d602d57..2ef0c4c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/dataserver/retriever/AdvancedDataRetriever.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/dataserver/retriever/AdvancedDataRetriever.java
@@ -22,14 +22,14 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.handler.codec.http.HttpRequest;
-import org.jboss.netty.handler.codec.http.QueryStringDecoder;
+import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.QueryUnitAttemptId;
import org.apache.tajo.QueryUnitId;
-import org.apache.tajo.SubQueryId;
import org.apache.tajo.util.TajoIdUtils;
import org.apache.tajo.worker.dataserver.FileAccessForbiddenException;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.handler.codec.http.HttpRequest;
+import org.jboss.netty.handler.codec.http.QueryStringDecoder;
import java.io.File;
import java.io.FileNotFoundException;
@@ -81,7 +81,7 @@ public class AdvancedDataRetriever implements DataRetriever {
List<String> qids = splitMaps(params.get("qid"));
for (String qid : qids) {
String[] ids = qid.split("_");
- SubQueryId suid = TajoIdUtils.newSubQueryId(params.get("sid").get(0));
+ ExecutionBlockId suid = TajoIdUtils.createExecutionBlockId(params.get("sid").get(0));
QueryUnitId quid = new QueryUnitId(suid, Integer.parseInt(ids[0]));
QueryUnitAttemptId attemptId = new QueryUnitAttemptId(quid,
Integer.parseInt(ids[1]));
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/proto/ClientProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/ClientProtocol.proto b/tajo-core/tajo-core-backend/src/main/proto/ClientProtocol.proto
index 61c14c4..43d99ef 100644
--- a/tajo-core/tajo-core-backend/src/main/proto/ClientProtocol.proto
+++ b/tajo-core/tajo-core-backend/src/main/proto/ClientProtocol.proto
@@ -51,13 +51,13 @@ message UpdateQueryResponse {
message SubmitQueryResponse {
required ResultCode resultCode = 1;
- optional ApplicationAttemptIdProto queryId = 2;
+ optional string queryId = 2;
optional string errorMessage = 3;
}
message GetQueryResultRequest {
optional SessionIdProto sessionId = 1;
- required ApplicationAttemptIdProto queryId = 2;
+ required string queryId = 2;
}
message GetQueryResultResponse {
@@ -70,7 +70,7 @@ message GetQueryListRequest {
}
message BriefQueryStatus {
- required ApplicationAttemptIdProto queryId = 1;
+ required string queryId = 1;
required QueryState state = 2;
required int32 executionTime = 3;
}
@@ -81,12 +81,12 @@ message GetQueryListResponse {
message GetQueryStatusRequest {
optional SessionIdProto sessionId = 1;
- required ApplicationAttemptIdProto queryId = 2;
+ required string queryId = 2;
}
message GetQueryStatusResponse {
required ResultCode resultCode = 1;
- required ApplicationAttemptIdProto queryId = 2;
+ required string queryId = 2;
optional QueryState state = 3;
optional float progress = 4;
optional int64 submitTime = 5;
@@ -142,7 +142,7 @@ service ClientProtocolService {
rpc getQueryResult(GetQueryResultRequest) returns (GetQueryResultResponse);
rpc getQueryList(GetQueryListRequest) returns (GetQueryListResponse);
rpc getQueryStatus(GetQueryStatusRequest) returns (GetQueryStatusResponse);
- rpc killQuery(ApplicationAttemptIdProto) returns (BoolProto);
+ rpc killQuery(StringProto) returns (BoolProto);
rpc getClusterInfo(GetClusterInfoRequest) returns (GetClusterInfoResponse);
rpc existTable(StringProto) returns (BoolProto);
rpc getTableList(GetTableListRequest) returns (GetTableListResponse);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/proto/ClientProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/ClientProtos.proto b/tajo-core/tajo-core-backend/src/main/proto/ClientProtos.proto
index 2c5c2b6..f3b1005 100644
--- a/tajo-core/tajo-core-backend/src/main/proto/ClientProtos.proto
+++ b/tajo-core/tajo-core-backend/src/main/proto/ClientProtos.proto
@@ -51,13 +51,13 @@ message UpdateQueryResponse {
message SubmitQueryResponse {
required ResultCode resultCode = 1;
- optional ApplicationAttemptIdProto queryId = 2;
+ optional QueryIdProto queryId = 2;
optional string errorMessage = 3;
}
message GetQueryResultRequest {
optional SessionIdProto sessionId = 1;
- required ApplicationAttemptIdProto queryId = 2;
+ required QueryIdProto queryId = 2;
}
message GetQueryResultResponse {
@@ -70,7 +70,7 @@ message GetQueryListRequest {
}
message BriefQueryStatus {
- required ApplicationAttemptIdProto queryId = 1;
+ required QueryIdProto queryId = 1;
required QueryState state = 2;
required int32 executionTime = 3;
}
@@ -81,12 +81,12 @@ message GetQueryListResponse {
message GetQueryStatusRequest {
optional SessionIdProto sessionId = 1;
- required ApplicationAttemptIdProto queryId = 2;
+ required QueryIdProto queryId = 2;
}
message GetQueryStatusResponse {
required ResultCode resultCode = 1;
- required ApplicationAttemptIdProto queryId = 2;
+ required QueryIdProto queryId = 2;
optional QueryState state = 3;
optional float progress = 4;
optional int64 submitTime = 5;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/proto/QueryMasterClientProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/QueryMasterClientProtocol.proto b/tajo-core/tajo-core-backend/src/main/proto/QueryMasterClientProtocol.proto
index 9337078..7da83bc 100644
--- a/tajo-core/tajo-core-backend/src/main/proto/QueryMasterClientProtocol.proto
+++ b/tajo-core/tajo-core-backend/src/main/proto/QueryMasterClientProtocol.proto
@@ -32,5 +32,5 @@ service QueryMasterClientProtocolService {
rpc updateSessionVariables(UpdateSessionVariableRequest) returns (BoolProto);
rpc getQueryResult(GetQueryResultRequest) returns (GetQueryResultResponse);
rpc getQueryStatus(GetQueryStatusRequest) returns (GetQueryStatusResponse);
- rpc killQuery(ApplicationAttemptIdProto) returns (BoolProto);
+ rpc killQuery(QueryIdProto) returns (BoolProto);
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/proto/QueryMasterManagerProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/QueryMasterManagerProtocol.proto b/tajo-core/tajo-core-backend/src/main/proto/QueryMasterManagerProtocol.proto
deleted file mode 100644
index 08fc5c9..0000000
--- a/tajo-core/tajo-core-backend/src/main/proto/QueryMasterManagerProtocol.proto
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-option java_package = "org.apache.tajo.ipc";
-option java_outer_classname = "QueryMasterManagerProtocol";
-option java_generic_services = true;
-option java_generate_equals_and_hash = true;
-
-import "yarn_protos.proto";
-import "tajo_protos.proto";
-import "TajoIdProtos.proto";
-import "CatalogProtos.proto";
-import "PrimitiveProtos.proto";
-
-message QueryHeartbeat {
- required ApplicationAttemptIdProto queryId = 1;
- required string queryMasterHost = 2;
- required int32 queryMasterPort = 3;
- required int32 queryMasterClientPort = 4;
- required QueryState state = 5;
- optional string statusMessage = 6;
-}
-
-message QueryHeartbeatResponse {
- message ResponseCommand {
- required string command = 1;
- repeated string params = 2;
- }
- required BoolProto heartbeatResult = 1;
- optional ResponseCommand responseCommand = 3;
-}
-
-service QueryMasterManagerProtocolService {
- rpc queryHeartbeat(QueryHeartbeat) returns (QueryHeartbeatResponse);
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/proto/QueryMasterProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/QueryMasterProtocol.proto b/tajo-core/tajo-core-backend/src/main/proto/QueryMasterProtocol.proto
deleted file mode 100644
index b6a0602..0000000
--- a/tajo-core/tajo-core-backend/src/main/proto/QueryMasterProtocol.proto
+++ /dev/null
@@ -1,132 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-option java_package = "org.apache.tajo.ipc";
-option java_outer_classname = "QueryMasterProtocol";
-option java_generic_services = true;
-option java_generate_equals_and_hash = true;
-
-import "yarn_protos.proto";
-import "tajo_protos.proto";
-import "TajoIdProtos.proto";
-import "CatalogProtos.proto";
-import "PrimitiveProtos.proto";
-
-message TaskStatusProto {
- required QueryUnitAttemptIdProto id = 1;
- required string workerName = 2;
- required float progress = 3;
- required TaskAttemptState state = 4;
- optional StatSetProto stats = 5;
- optional TableStatProto resultStats = 6;
- repeated Partition partitions = 7;
-}
-
-message TaskCompletionReport {
- required QueryUnitAttemptIdProto id = 1;
- optional StatSetProto stats = 2;
- optional TableStatProto resultStats = 3;
- repeated Partition partitions = 4;
-}
-
-message TaskFatalErrorReport {
- required QueryUnitAttemptIdProto id = 1;
- optional string error_message = 2;
-}
-
-message QueryUnitRequestProto {
- required QueryUnitAttemptIdProto id = 1;
- repeated FragmentProto fragments = 2;
- required string outputTable = 3;
- required bool clusteredOutput = 4;
- required string serializedData = 5;
- optional bool interQuery = 6 [default = false];
- repeated Fetch fetches = 7;
- optional bool shouldDie = 8;
-}
-
-message Fetch {
- required string name = 1;
- required string urls = 2;
-}
-
-message QueryUnitResponseProto {
- required string id = 1;
- required QueryState status = 2;
-}
-
-message StatusReportProto {
- required int64 timestamp = 1;
- required string serverName = 2;
- repeated TaskStatusProto status = 3;
- repeated QueryUnitAttemptIdProto pings = 4;
-}
-
-message CommandRequestProto {
- repeated Command command = 1;
-}
-
-message CommandResponseProto {
-}
-
-message Command {
- required QueryUnitAttemptIdProto id = 1;
- required CommandType type = 2;
-}
-
-enum CommandType {
- PREPARE = 0;
- LAUNCH = 1;
- STOP = 2;
- FINALIZE = 3;
-}
-
-message Partition {
- required int32 partitionKey = 1;
- optional string fileName = 2;
-}
-
-message ServerStatusProto {
- message System {
- required int32 availableProcessors = 1;
- required int64 freeMemory = 2;
- required int64 maxMemory = 3;
- required int64 totalMemory = 4;
- }
- message Disk {
- required string absolutePath = 1;
- required int64 totalSpace = 2;
- required int64 freeSpace = 3;
- required int64 usableSpace = 4;
- }
- required System system = 1;
- repeated Disk disk = 2;
- required int32 taskNum = 3;
-}
-
-service QueryMasterProtocolService {
- //from Worker
- rpc getTask(ContainerIdProto) returns (QueryUnitRequestProto);
- rpc statusUpdate (TaskStatusProto) returns (BoolProto);
- rpc ping (QueryUnitAttemptIdProto) returns (BoolProto);
- rpc fatalError(TaskFatalErrorReport) returns (BoolProto);
- rpc done (TaskCompletionReport) returns (BoolProto);
-
- //from QueryMasterManager
- rpc executeQuery(StringProto) returns (BoolProto);
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/proto/TajoIdProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/TajoIdProtos.proto b/tajo-core/tajo-core-backend/src/main/proto/TajoIdProtos.proto
index 04c67f2..a87c825 100644
--- a/tajo-core/tajo-core-backend/src/main/proto/TajoIdProtos.proto
+++ b/tajo-core/tajo-core-backend/src/main/proto/TajoIdProtos.proto
@@ -21,15 +21,18 @@ option java_outer_classname = "TajoIdProtos";
option java_generic_services = false;
option java_generate_equals_and_hash = true;
-import "yarn_protos.proto";
+message QueryIdProto {
+ required string id = 1;
+ required int32 seq = 2;
+}
-message SubQueryIdProto {
- required ApplicationAttemptIdProto queryId = 1;
+message ExecutionBlockIdProto {
+ required QueryIdProto queryId = 1;
required int32 id = 2;
}
message QueryUnitIdProto {
- required SubQueryIdProto subQueryId = 1;
+ required ExecutionBlockIdProto executionBlockId = 1;
required int32 id = 2;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/proto/TajoMasterClientProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/TajoMasterClientProtocol.proto b/tajo-core/tajo-core-backend/src/main/proto/TajoMasterClientProtocol.proto
index ef7e711..26dbbed 100644
--- a/tajo-core/tajo-core-backend/src/main/proto/TajoMasterClientProtocol.proto
+++ b/tajo-core/tajo-core-backend/src/main/proto/TajoMasterClientProtocol.proto
@@ -16,6 +16,7 @@
* limitations under the License.
*/
+//TajoClient -> TajoMaster Protocol
option java_package = "org.apache.tajo.ipc";
option java_outer_classname = "TajoMasterClientProtocol";
option java_generic_services = true;
@@ -30,12 +31,12 @@ import "ClientProtos.proto";
service TajoMasterClientProtocolService {
rpc updateSessionVariables(UpdateSessionVariableRequest) returns (BoolProto);
- rpc submitQuery(QueryRequest) returns (SubmitQueryResponse);
+ rpc submitQuery(QueryRequest) returns (GetQueryStatusResponse);
rpc updateQuery(QueryRequest) returns (UpdateQueryResponse);
rpc getQueryResult(GetQueryResultRequest) returns (GetQueryResultResponse);
rpc getQueryList(GetQueryListRequest) returns (GetQueryListResponse);
rpc getQueryStatus(GetQueryStatusRequest) returns (GetQueryStatusResponse);
- rpc killQuery(ApplicationAttemptIdProto) returns (BoolProto);
+ rpc killQuery(QueryIdProto) returns (BoolProto);
rpc getClusterInfo(GetClusterInfoRequest) returns (GetClusterInfoResponse);
rpc existTable(StringProto) returns (BoolProto);
rpc getTableList(GetTableListRequest) returns (GetTableListResponse);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/proto/TajoMasterProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/TajoMasterProtocol.proto b/tajo-core/tajo-core-backend/src/main/proto/TajoMasterProtocol.proto
new file mode 100644
index 0000000..0153c8d
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/proto/TajoMasterProtocol.proto
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+//TajoWorker -> TajoMaster protocol
+
+option java_package = "org.apache.tajo.ipc";
+option java_outer_classname = "TajoMasterProtocol";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+
+import "yarn_protos.proto";
+import "tajo_protos.proto";
+import "TajoIdProtos.proto";
+import "CatalogProtos.proto";
+import "PrimitiveProtos.proto";
+
+message ServerStatusProto {
+ message System {
+ required int32 availableProcessors = 1;
+ required int32 freeMemoryMB = 2;
+ required int32 maxMemoryMB = 3;
+ required int32 totalMemoryMB = 4;
+ }
+ message Disk {
+ required string absolutePath = 1;
+ required int64 totalSpace = 2;
+ required int64 freeSpace = 3;
+ required int64 usableSpace = 4;
+ }
+ required System system = 1;
+ required int32 diskSlots = 2;
+ repeated Disk disk = 3;
+ required int32 runningTaskNum = 4;
+}
+
+message TajoHeartbeat {
+ required string tajoWorkerHost = 1;
+ required int32 tajoWorkerPort = 2;
+ optional ServerStatusProto serverStatus = 3;
+ optional int32 tajoWorkerClientPort = 4;
+ optional QueryIdProto queryId = 5;
+ optional QueryState state = 6;
+ optional string statusMessage = 7;
+}
+
+message TajoHeartbeatResponse {
+ message ResponseCommand {
+ required string command = 1;
+ repeated string params = 2;
+ }
+ required BoolProto heartbeatResult = 1;
+ optional ResponseCommand responseCommand = 3;
+}
+
+message WorkerResourceAllocationRequest {
+ required ExecutionBlockIdProto executionBlockId = 1;
+ required int32 numWorks = 2;
+ required int32 memoryMBSlots = 3 ;
+ required int32 diskSlots = 4;
+}
+
+message WorkerResourceProto {
+ required string workerHostAndPort = 1;
+ required ExecutionBlockIdProto executionBlockId = 2;
+ required int32 memoryMBSlots = 3 ;
+ required int32 diskSlots = 4;
+}
+
+message WorkerResourceReleaseRequest {
+ repeated WorkerResourceProto workerResources = 1;
+}
+
+message WorkerResourceAllocationResponse {
+ required ExecutionBlockIdProto executionBlockId = 1;
+ repeated string allocatedWorks = 2;
+}
+
+service TajoMasterProtocolService {
+ rpc heartbeat(TajoHeartbeat) returns (TajoHeartbeatResponse);
+ rpc allocateWorkerResources(WorkerResourceAllocationRequest) returns (WorkerResourceAllocationResponse);
+ rpc releaseWorkerResource(WorkerResourceReleaseRequest) returns (BoolProto);
+ rpc stopQueryMaster(QueryIdProto) returns (BoolProto);
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto b/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
new file mode 100644
index 0000000..88a2029
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// TajoMaster -> TajoWorker, TajoWorker(QueryMaster) <-> TajoWorker Protocol
+option java_package = "org.apache.tajo.ipc";
+option java_outer_classname = "TajoWorkerProtocol";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+
+import "yarn_protos.proto";
+import "tajo_protos.proto";
+import "TajoIdProtos.proto";
+import "CatalogProtos.proto";
+import "PrimitiveProtos.proto";
+
+message TaskStatusProto {
+ required QueryUnitAttemptIdProto id = 1;
+ required string workerName = 2;
+ required float progress = 3;
+ required TaskAttemptState state = 4;
+ optional StatSetProto stats = 5;
+ optional TableStatProto resultStats = 6;
+ repeated Partition partitions = 7;
+}
+
+message TaskCompletionReport {
+ required QueryUnitAttemptIdProto id = 1;
+ optional StatSetProto stats = 2;
+ optional TableStatProto resultStats = 3;
+ repeated Partition partitions = 4;
+}
+
+message TaskFatalErrorReport {
+ required QueryUnitAttemptIdProto id = 1;
+ optional string error_message = 2;
+}
+
+message QueryUnitRequestProto {
+ required QueryUnitAttemptIdProto id = 1;
+ repeated FragmentProto fragments = 2;
+ required string outputTable = 3;
+ required bool clusteredOutput = 4;
+ required string serializedData = 5;
+ optional bool interQuery = 6 [default = false];
+ repeated Fetch fetches = 7;
+ optional bool shouldDie = 8;
+}
+
+message Fetch {
+ required string name = 1;
+ required string urls = 2;
+}
+
+message QueryUnitResponseProto {
+ required string id = 1;
+ required QueryState status = 2;
+}
+
+message StatusReportProto {
+ required int64 timestamp = 1;
+ required string serverName = 2;
+ repeated TaskStatusProto status = 3;
+ repeated QueryUnitAttemptIdProto pings = 4;
+}
+
+message CommandRequestProto {
+ repeated Command command = 1;
+}
+
+message CommandResponseProto {
+}
+
+message Command {
+ required QueryUnitAttemptIdProto id = 1;
+ required CommandType type = 2;
+}
+
+enum CommandType {
+ PREPARE = 0;
+ LAUNCH = 1;
+ STOP = 2;
+ FINALIZE = 3;
+}
+
+message Partition {
+ required int32 partitionKey = 1;
+ optional string fileName = 2;
+}
+
+message QueryExecutionRequestProto {
+ required QueryIdProto queryId = 1;
+ required StringProto logicalPlanJson = 2;
+}
+
+message GetTaskRequestProto {
+ required ContainerIdProto containerId = 1;
+ required ExecutionBlockIdProto executionBlockId = 2;
+}
+
+message RunExecutionBlockRequestProto {
+ required string executionBlockId = 1;
+ required string queryMasterHost = 2;
+ required int32 queryMasterPort = 3;
+ required string nodeId = 4;
+ required string containerId = 5;
+ optional string queryOutputPath = 6;
+}
+
+service TajoWorkerProtocolService {
+ //from Worker
+ rpc getTask(GetTaskRequestProto) returns (QueryUnitRequestProto);
+ rpc statusUpdate (TaskStatusProto) returns (BoolProto);
+ rpc ping (QueryUnitAttemptIdProto) returns (BoolProto);
+ rpc fatalError(TaskFatalErrorReport) returns (BoolProto);
+ rpc done (TaskCompletionReport) returns (BoolProto);
+
+ //from TajoMaster's QueryJobManager
+ rpc executeQuery(QueryExecutionRequestProto) returns (BoolProto);
+
+ //from QueryMaster(Worker)
+ rpc executeExecutionBlock(RunExecutionBlockRequestProto) returns (BoolProto);
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/resources/log4j.properties b/tajo-core/tajo-core-backend/src/main/resources/log4j.properties
index 2b42975..007c8f5 100644
--- a/tajo-core/tajo-core-backend/src/main/resources/log4j.properties
+++ b/tajo-core/tajo-core-backend/src/main/resources/log4j.properties
@@ -23,3 +23,6 @@ log4j.threshhold=INFO
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
+
+log4j.logger.org.apache.hadoop=WARN
+log4j.logger.org.apache.hadoop.conf=ERROR
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/resources/tajo-default.xml
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/resources/tajo-default.xml b/tajo-core/tajo-core-backend/src/main/resources/tajo-default.xml
index a1a111c..e8ad503 100644
--- a/tajo-core/tajo-core-backend/src/main/resources/tajo-default.xml
+++ b/tajo-core/tajo-core-backend/src/main/resources/tajo-default.xml
@@ -43,4 +43,72 @@
</property>
+
+ <property>
+ <name>tajo.master.clientservice.addr</name>
+ <value>127.0.0.1:9004</value>
+ </property>
+
+ <property>
+ <name>tajo.master.manager.addr</name>
+ <value>127.0.0.1:9005</value>
+ <description>rpc port for tajo worker</description>
+ </property>
+
+ <property>
+ <name>tajo.query.session.timeout</name>
+ <value>60000</value>
+ <description>ms</description>
+ </property>
+
+ <property>
+ <name>tajo.resource.manager</name>
+ <value>org.apache.tajo.master.rm.TajoWorkerResourceManager</value>
+ <description>This can be org.apache.tajo.master.rm.TajoWorkerResourceManager or org.apache.tajo.master.rm.YarnTajoResourceManager</description>
+ </property>
+
+ <property>
+ <name>tajo.querymaster.memoryMB</name>
+ <value>512</value>
+ <description>the memory slot size for a QeuryMaster</description>
+ </property>
+
+ <property>
+ <name>tajo.worker.slots.use.os.info</name>
+ <value>true</value>
+ <description>If true, Tajo system obtains the physical resource information from OS.
+ If false, the physical resource information is obtained from the below configs.</description>
+ </property>
+
+ <!-- Default Node's Physical information -->
+ <!-- The below configs are used if tajo.worker.slots.use.os.info is set to true. -->
+ <property>
+ <name>tajo.worker.slots.os.memory.ratio</name>
+ <value>0.8f</value>
+ <description>The ratio of allocatable memory to the total system memory</description>
+ </property>
+
+ <property>
+ <name>tajo.worker.slots.memoryMB</name>
+ <value>2048</value>
+ <description></description>
+ </property>
+
+ <property>
+ <name>tajo.worker.slots.disk</name>
+ <value>2</value>
+ <description>The number of disks on a worker</description>
+ </property>
+
+ <property>
+ <name>tajo.worker.slots.disk.concurrency</name>
+ <value>4</value>
+ <description>the maximum concurrency number per disk slot</description>
+ </property>
+
+ <property>
+ <name>tajo.worker.slots.cpu.core</name>
+ <value>4</value>
+ <description>The number of CPU cores on a worker</description>
+ </property>
</configuration>
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/test/java/log4j.properties
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/log4j.properties b/tajo-core/tajo-core-backend/src/test/java/log4j.properties
index c1ac487..749124c 100644
--- a/tajo-core/tajo-core-backend/src/test/java/log4j.properties
+++ b/tajo-core/tajo-core-backend/src/test/java/log4j.properties
@@ -23,3 +23,6 @@ log4j.threshhold=ALL
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
+
+log4j.logger.org.apache.hadoop=WARN
+log4j.logger.org.apache.hadoop.conf=ERROR
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java
index 5e8d11d..1667813 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java
@@ -23,7 +23,10 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Options;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.client.TajoClient;
import org.apache.tajo.conf.TajoConf;
@@ -83,7 +86,11 @@ public class LocalTajoTestingUtility {
}
public void shutdown() throws IOException {
- client.close();
- util.shutdownMiniCluster();
+ if(client != null) {
+ client.close();
+ }
+ if(util != null) {
+ util.shutdownMiniCluster();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/MiniTajoYarnCluster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/MiniTajoYarnCluster.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/MiniTajoYarnCluster.java
index 5b7267f..37e2721 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/MiniTajoYarnCluster.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/MiniTajoYarnCluster.java
@@ -110,9 +110,6 @@ public class MiniTajoYarnCluster extends MiniYARNCluster {
conf.setInt("yarn.nodemanager.delete.debug-delay-sec", 600);
- // Disable virtual memory constraints for containers
- conf.setBoolean("yarn.nodemanager.vmem-check-enabled", false);
-
super.init(conf);
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
index 32b1f56..041043d 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
@@ -27,9 +27,8 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.tajo.catalog.*;
import org.apache.tajo.catalog.proto.CatalogProtos;
@@ -37,12 +36,17 @@ import org.apache.tajo.client.TajoClient;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.conf.TajoConf.ConfVars;
import org.apache.tajo.master.TajoMaster;
+import org.apache.tajo.master.rm.TajoWorkerResourceManager;
import org.apache.tajo.util.NetUtils;
+import org.apache.tajo.worker.TajoWorker;
import java.io.*;
+import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URL;
import java.sql.ResultSet;
+import java.util.ArrayList;
+import java.util.List;
import java.util.UUID;
public class TajoTestingCluster {
@@ -54,8 +58,9 @@ public class TajoTestingCluster {
private MiniDFSCluster dfsCluster;
private MiniCatalogServer catalogServer;
-
private TajoMaster tajoMaster;
+ private List<TajoWorker> tajoWorkers = new ArrayList<TajoWorker>();
+ private boolean standbyWorkerMode = false;
// If non-null, then already a cluster running.
private File clusterTestBuildDir = null;
@@ -73,7 +78,10 @@ public class TajoTestingCluster {
public static final String DEFAULT_TEST_DIRECTORY = "target/test-data";
public TajoTestingCluster() {
- this.conf = new TajoConf();
+ this.conf = new TajoConf();
+ this.standbyWorkerMode =
+ this.conf.get("tajo.resource.manager", TajoWorkerResourceManager.class.getCanonicalName())
+ .indexOf(TajoWorkerResourceManager.class.getName()) >= 0;
}
public TajoConf getConfiguration() {
@@ -113,7 +121,7 @@ public class TajoTestingCluster {
String dirStr = getTestDir(randomStr).toString();
File dir = new File(dirStr).getAbsoluteFile();
// Have it cleaned up on exit
- dir.deleteOnExit();
+ //dir.deleteOnExit();
return dir;
}
@@ -155,8 +163,7 @@ public class TajoTestingCluster {
System.setProperty(MiniDFSCluster.PROP_TEST_BUILD_DATA,
this.clusterTestBuildDir.toString());
- conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
- MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(new HdfsConfiguration(conf));
+ MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
builder.hosts(hosts);
builder.numDataNodes(servers);
builder.format(true);
@@ -210,7 +217,7 @@ public class TajoTestingCluster {
catalogServer = new MiniCatalogServer(conf);
CatalogServer catServer = catalogServer.getCatalogServer();
InetSocketAddress sockAddr = catServer.getBindAddress();
- c.setVar(ConfVars.CATALOG_ADDRESS, NetUtils.normalizeInetSocketAddress(sockAddr));
+ c.setVar(ConfVars.CATALOG_ADDRESS, NetUtils.getIpPortString(sockAddr));
return this.catalogServer;
}
@@ -232,11 +239,12 @@ public class TajoTestingCluster {
TajoConf c = getConfiguration();
c.setVar(ConfVars.TASKRUNNER_LISTENER_ADDRESS, "localhost:0");
c.setVar(ConfVars.CLIENT_SERVICE_ADDRESS, "localhost:0");
- c.setVar(ConfVars.QUERY_MASTER_MANAGER_SERVICE_ADDRESS, "localhost:0");
+ c.setVar(ConfVars.TAJO_MASTER_SERVICE_ADDRESS, "localhost:0");
c.setVar(ConfVars.CATALOG_ADDRESS, "localhost:0");
c.set(CatalogConstants.STORE_CLASS, "org.apache.tajo.catalog.store.MemStore");
c.set(CatalogConstants.JDBC_URI, "jdbc:derby:target/test-data/tcat/db");
+
LOG.info("derby repository is set to "+conf.get(CatalogConstants.JDBC_URI));
if (!local) {
@@ -253,11 +261,38 @@ public class TajoTestingCluster {
this.conf.setVar(ConfVars.TASKRUNNER_LISTENER_ADDRESS, c.getVar(ConfVars.TASKRUNNER_LISTENER_ADDRESS));
this.conf.setVar(ConfVars.CLIENT_SERVICE_ADDRESS, c.getVar(ConfVars.CLIENT_SERVICE_ADDRESS));
+
+ InetSocketAddress tajoMasterAddress = tajoMaster.getContext().getTajoMasterService().getBindAddress();
+
+ this.conf.setVar(ConfVars.TAJO_MASTER_SERVICE_ADDRESS,
+ tajoMasterAddress.getHostName() + ":" + tajoMasterAddress.getPort());
+
this.conf.setVar(ConfVars.CATALOG_ADDRESS, c.getVar(ConfVars.CATALOG_ADDRESS));
+ if(standbyWorkerMode) {
+ startTajoWorkers(numSlaves);
+ }
LOG.info("Mini Tajo cluster is up");
}
+ private void startTajoWorkers(int numSlaves) throws Exception {
+ for(int i = 0; i < 1; i++) {
+ TajoWorker tajoWorker = new TajoWorker("all");
+
+ TajoConf workerConf = new TajoConf(this.conf);
+
+ workerConf.setInt("tajo.worker.info.port", 0);
+ workerConf.setInt("tajo.worker.client.rpc.port", 0);
+ workerConf.setInt("tajo.worker.manager.rpc.port", 0);
+ workerConf.setInt(TajoConf.ConfVars.PULLSERVER_PORT.varname, 0);
+
+ tajoWorker.startWorker(workerConf, new String[]{"standby"});
+
+ LOG.info("=====> MiniTajoCluster Worker #" + (i + 1) + " started.");
+ tajoWorkers.add(tajoWorker);
+ }
+ }
+
public void restartTajoCluster(int numSlaves) throws Exception {
tajoMaster.stop();
tajoMaster.start();
@@ -273,6 +308,10 @@ public class TajoTestingCluster {
if(this.tajoMaster != null) {
this.tajoMaster.stop();
}
+ for(TajoWorker eachWorker: tajoWorkers) {
+ eachWorker.stopWorkerForce();
+ }
+ tajoWorkers.clear();
this.tajoMaster= null;
}
@@ -297,7 +336,8 @@ public class TajoTestingCluster {
*/
public void startMiniCluster(final int numSlaves)
throws Exception {
- startMiniCluster(numSlaves, null);
+ String localHostName = InetAddress.getLocalHost().getHostName();
+ startMiniCluster(numSlaves, new String[] {localHostName});
}
public void startMiniCluster(final int numSlaves,
@@ -331,17 +371,19 @@ public class TajoTestingCluster {
startMiniDFSCluster(numDataNodes, this.clusterTestBuildDir, dataNodeHosts);
this.dfsCluster.waitClusterUp();
+ if(!standbyWorkerMode) {
+ startMiniYarnCluster();
+ }
+
+ startMiniTajoCluster(this.clusterTestBuildDir, numSlaves, false);
+ }
+
+ private void startMiniYarnCluster() throws Exception {
LOG.info("Starting up YARN cluster");
// Scheduler properties required for YARN to work
conf.set("yarn.scheduler.capacity.root.queues", "default");
conf.set("yarn.scheduler.capacity.root.default.capacity", "100");
- // fixed thread OOM
- conf.setInt(YarnConfiguration.RM_CLIENT_THREAD_COUNT, 2);
- conf.setInt(YarnConfiguration.RM_SCHEDULER_CLIENT_THREAD_COUNT, 2);
- conf.setInt(YarnConfiguration.RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT, 2);
- conf.setInt(YarnConfiguration.NM_CONTAINER_MGR_THREAD_COUNT, 2);
-
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 384);
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 3000);
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, 1);
@@ -368,8 +410,6 @@ public class TajoTestingCluster {
yarnCluster.getConfig().writeXml(os);
os.close();
}
-
- startMiniTajoCluster(this.clusterTestBuildDir, numSlaves, false);
}
public void startMiniClusterInLocal(final int numSlaves) throws Exception {
@@ -413,9 +453,12 @@ public class TajoTestingCluster {
}
if(this.clusterTestBuildDir != null && this.clusterTestBuildDir.exists()) {
- LocalFileSystem localFS = LocalFileSystem.getLocal(conf);
- localFS.delete(
- new Path(clusterTestBuildDir.toString()), true);
+ if(!ShutdownHookManager.get().isShutdownInProgress()) {
+ //TODO clean test dir when ShutdownInProgress
+ LocalFileSystem localFS = LocalFileSystem.getLocal(conf);
+ localFS.delete(
+ new Path(clusterTestBuildDir.toString()), true);
+ }
this.clusterTestBuildDir = null;
}
@@ -457,6 +500,12 @@ public class TajoTestingCluster {
String query) throws Exception {
TpchTestBase instance = TpchTestBase.getInstance();
TajoTestingCluster util = instance.getTestingCluster();
+ while(true) {
+ if(util.getMaster().isMasterRunning()) {
+ break;
+ }
+ Thread.sleep(1000);
+ }
TajoConf conf = util.getConfiguration();
TajoClient client = new TajoClient(conf);
@@ -503,33 +552,4 @@ public class TajoTestingCluster {
Closeables.closeQuietly(writer);
}
}
-
-
- /**
- * @param args
- * @throws Exception
- */
- public static void main(String[] args) throws Exception {
- TajoTestingCluster cluster = new TajoTestingCluster();
- File f = cluster.setupClusterTestBuildDir();
- System.out.println("first setupClusterTestBuildDir: " + f);
- f = cluster.setupClusterTestBuildDir();
- System.out.println("second setupClusterTestBuildDir: " + f);
- f = cluster.getTestDir();
- System.out.println("getTestDir() after second: " + f);
- f = cluster.getTestDir("abc");
- System.out.println("getTestDir(\"abc\") after second: " + f);
-
- cluster.initTestDir();
- f = cluster.getTestDir();
- System.out.println("getTestDir() after initTestDir: " + f);
- f = cluster.getTestDir("abc");
- System.out.println("getTestDir(\"abc\") after initTestDir: " + f);
- f = cluster.setupClusterTestBuildDir();
- System.out.println("setupClusterTestBuildDir() after initTestDir: " + f);
-
- TajoTestingCluster cluster2 = new TajoTestingCluster();
- File f2 = cluster2.setupClusterTestBuildDir();
- System.out.println("first setupClusterTestBuildDir of cluster2: " + f2);
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TestQueryIdFactory.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TestQueryIdFactory.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TestQueryIdFactory.java
index b4d920f..7b82952 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TestQueryIdFactory.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TestQueryIdFactory.java
@@ -27,7 +27,6 @@ public class TestQueryIdFactory {
@Before
public void setup() {
- QueryIdFactory.reset();
}
@Test
@@ -40,15 +39,15 @@ public class TestQueryIdFactory {
@Test
public void testNewSubQueryId() {
QueryId qid = QueryIdFactory.newQueryId();
- SubQueryId subqid1 = QueryIdFactory.newSubQueryId(qid);
- SubQueryId subqid2 = QueryIdFactory.newSubQueryId(qid);
+ ExecutionBlockId subqid1 = QueryIdFactory.newExecutionBlockId(qid);
+ ExecutionBlockId subqid2 = QueryIdFactory.newExecutionBlockId(qid);
assertTrue(subqid1.compareTo(subqid2) < 0);
}
@Test
public void testNewQueryUnitId() {
QueryId qid = QueryIdFactory.newQueryId();
- SubQueryId subid = QueryIdFactory.newSubQueryId(qid);
+ ExecutionBlockId subid = QueryIdFactory.newExecutionBlockId(qid);
QueryUnitId quid1 = QueryIdFactory.newQueryUnitId(subid);
QueryUnitId quid2 = QueryIdFactory.newQueryUnitId(subid);
assertTrue(quid1.compareTo(quid2) < 0);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TestTajoIds.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TestTajoIds.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TestTajoIds.java
index 386fe02..1997159 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TestTajoIds.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TestTajoIds.java
@@ -20,8 +20,8 @@ package org.apache.tajo;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.util.BuilderUtils;
-import org.junit.Test;
import org.apache.tajo.util.TajoIdUtils;
+import org.junit.Test;
import static org.junit.Assert.*;
@@ -31,10 +31,10 @@ public class TestTajoIds {
long ts1 = 1315890136000l;
long ts2 = 1315890136001l;
- QueryId j1 = createQueryId(ts1, 2, 1);
- QueryId j2 = createQueryId(ts1, 1, 2);
- QueryId j3 = createQueryId(ts2, 1, 2);
- QueryId j4 = createQueryId(ts1, 2, 1);
+ QueryId j1 = createQueryId(ts1, 2);
+ QueryId j2 = createQueryId(ts1, 1);
+ QueryId j3 = createQueryId(ts2, 1);
+ QueryId j4 = createQueryId(ts1, 2);
assertTrue(j1.equals(j4));
assertFalse(j1.equals(j2));
@@ -48,42 +48,42 @@ public class TestTajoIds {
assertFalse(j1.hashCode() == j2.hashCode());
assertFalse(j1.hashCode() == j3.hashCode());
- QueryId j5 = createQueryId(ts1, 231415, 2);
- assertEquals("q_" + ts1 + "_0002_000001", j1.toString());
- assertEquals("q_" + ts1 + "_231415_000002", j5.toString());
+ QueryId j5 = createQueryId(ts1, 231415);
+ assertEquals("q_" + ts1 + "_0002", j1.toString());
+ assertEquals("q_" + ts1 + "_231415", j5.toString());
}
@Test
public void testQueryIds() {
long timeId = 1315890136000l;
- QueryId queryId = createQueryId(timeId, 1, 1);
- assertEquals("q_" + timeId + "_0001_000001", queryId.toString());
+ QueryId queryId = createQueryId(timeId, 1);
+ assertEquals("q_" + timeId + "_0001", queryId.toString());
- SubQueryId subId = TajoIdUtils.newSubQueryId(queryId, 2);
- assertEquals("sq_" + timeId +"_0001_000001_02", subId.toString());
+ ExecutionBlockId subId = QueryIdFactory.newExecutionBlockId(queryId, 2);
+ assertEquals("eb_" + timeId +"_0001_000002", subId.toString());
QueryUnitId qId = new QueryUnitId(subId, 5);
- assertEquals("t_" + timeId + "_0001_000001_02_000005", qId.toString());
+ assertEquals("t_" + timeId + "_0001_000002_000005", qId.toString());
QueryUnitAttemptId attemptId = new QueryUnitAttemptId(qId, 4);
- assertEquals("ta_" + timeId + "_0001_000001_02_000005_04", attemptId.toString());
+ assertEquals("ta_" + timeId + "_0001_000002_000005_04", attemptId.toString());
}
@Test
public void testEqualsObject() {
long timeId = System.currentTimeMillis();
- QueryId queryId1 = createQueryId(timeId, 1, 1);
- QueryId queryId2 = createQueryId(timeId, 2, 2);
+ QueryId queryId1 = createQueryId(timeId, 1);
+ QueryId queryId2 = createQueryId(timeId, 2);
assertNotSame(queryId1, queryId2);
- QueryId queryId3 = createQueryId(timeId, 1, 1);
+ QueryId queryId3 = createQueryId(timeId, 1);
assertEquals(queryId1, queryId3);
- SubQueryId sid1 = TajoIdUtils.newSubQueryId(queryId1, 1);
- SubQueryId sid2 = TajoIdUtils.newSubQueryId(queryId1, 2);
+ ExecutionBlockId sid1 = QueryIdFactory.newExecutionBlockId(queryId1, 1);
+ ExecutionBlockId sid2 = QueryIdFactory.newExecutionBlockId(queryId1, 2);
assertNotSame(sid1, sid2);
- SubQueryId sid3 = TajoIdUtils.newSubQueryId(queryId1, 1);
+ ExecutionBlockId sid3 = QueryIdFactory.newExecutionBlockId(queryId1, 1);
assertEquals(sid1, sid3);
QueryUnitId qid1 = new QueryUnitId(sid1, 9);
@@ -97,16 +97,16 @@ public class TestTajoIds {
public void testCompareTo() {
long time = System.currentTimeMillis();
- QueryId queryId1 = createQueryId(time, 1, 1);
- QueryId queryId2 = createQueryId(time, 2, 2);
- QueryId queryId3 = createQueryId(time, 1, 1);
+ QueryId queryId1 = createQueryId(time, 1);
+ QueryId queryId2 = createQueryId(time, 2);
+ QueryId queryId3 = createQueryId(time, 1);
assertEquals(-1, queryId1.compareTo(queryId2));
assertEquals(1, queryId2.compareTo(queryId1));
assertEquals(0, queryId3.compareTo(queryId1));
-
- SubQueryId sid1 = TajoIdUtils.newSubQueryId(queryId1, 1);
- SubQueryId sid2 = TajoIdUtils.newSubQueryId(queryId1, 2);
- SubQueryId sid3 = TajoIdUtils.newSubQueryId(queryId1, 1);
+
+ ExecutionBlockId sid1 = QueryIdFactory.newExecutionBlockId(queryId1, 1);
+ ExecutionBlockId sid2 = QueryIdFactory.newExecutionBlockId(queryId1, 2);
+ ExecutionBlockId sid3 = QueryIdFactory.newExecutionBlockId(queryId1, 1);
assertEquals(-1, sid1.compareTo(sid2));
assertEquals(1, sid2.compareTo(sid1));
assertEquals(0, sid3.compareTo(sid1));
@@ -121,33 +121,33 @@ public class TestTajoIds {
@Test
public void testConstructFromString() {
- QueryIdFactory.reset();
+// QueryIdFactory.reset();
QueryId qid1 = QueryIdFactory.newQueryId();
- QueryId qid2 = TajoIdUtils.createQueryId(qid1.toString());
+ QueryId qid2 = TajoIdUtils.parseQueryId(qid1.toString());
assertEquals(qid1, qid2);
-
- SubQueryId sub1 = QueryIdFactory.newSubQueryId(qid1);
- SubQueryId sub2 = TajoIdUtils.newSubQueryId(sub1.toString());
+
+ ExecutionBlockId sub1 = QueryIdFactory.newExecutionBlockId(qid1);
+ ExecutionBlockId sub2 = TajoIdUtils.createExecutionBlockId(sub1.toString());
assertEquals(sub1, sub2);
QueryUnitId u1 = QueryIdFactory.newQueryUnitId(sub1);
- QueryUnitId u2 = new QueryUnitId(u1.toString());
+ QueryUnitId u2 = new QueryUnitId(u1.getProto());
assertEquals(u1, u2);
QueryUnitAttemptId attempt1 = new QueryUnitAttemptId(u1, 1);
- QueryUnitAttemptId attempt2 = new QueryUnitAttemptId(attempt1.toString());
+ QueryUnitAttemptId attempt2 = new QueryUnitAttemptId(attempt1.getProto());
assertEquals(attempt1, attempt2);
}
@Test
public void testConstructFromPB() {
- QueryIdFactory.reset();
+// QueryIdFactory.reset();
QueryId qid1 = QueryIdFactory.newQueryId();
QueryId qid2 = new QueryId(qid1.getProto());
assertEquals(qid1, qid2);
- SubQueryId sub1 = QueryIdFactory.newSubQueryId(qid1);
- SubQueryId sub2 = new SubQueryId(sub1.getProto());
+ ExecutionBlockId sub1 = QueryIdFactory.newExecutionBlockId(qid1);
+ ExecutionBlockId sub2 = TajoIdUtils.createExecutionBlockId(sub1.toString());
assertEquals(sub1, sub2);
QueryUnitId u1 = QueryIdFactory.newQueryUnitId(sub1);
@@ -159,9 +159,9 @@ public class TestTajoIds {
assertEquals(attempt1, attempt2);
}
- public static QueryId createQueryId(long timestamp, int id, int attemptId) {
+ public static QueryId createQueryId(long timestamp, int id) {
ApplicationId appId = BuilderUtils.newApplicationId(timestamp, id);
- return TajoIdUtils.createQueryId(appId, attemptId);
+ return QueryIdFactory.newQueryId(appId.toString());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TpchTestBase.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TpchTestBase.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TpchTestBase.java
index c761103..ad3d676 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TpchTestBase.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TpchTestBase.java
@@ -49,7 +49,6 @@ public class TpchTestBase {
try {
testBase = new TpchTestBase();
testBase.setUp();
- Runtime.getRuntime().addShutdownHook(new ShutdownHook());
} catch (Exception e) {
LOG.error(e.getMessage(), e);
}
@@ -107,19 +106,11 @@ public class TpchTestBase {
return util.getTestingCluster();
}
- public static class ShutdownHook extends Thread {
-
- @Override
- public void run() {
- try {
- testBase.tearDown();
- } catch (IOException e) {
- LOG.error(e);
- }
+ public void tearDown() throws IOException {
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException e) {
}
- }
-
- private void tearDown() throws IOException {
util.shutdown();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/plan/global/TestGlobalQueryPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/plan/global/TestGlobalQueryPlanner.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/plan/global/TestGlobalQueryPlanner.java
index cc75726..ede73c5 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/plan/global/TestGlobalQueryPlanner.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/plan/global/TestGlobalQueryPlanner.java
@@ -136,7 +136,6 @@ public class TestGlobalQueryPlanner {
catalog.addTable(desc);
}
- QueryIdFactory.reset();
queryId = QueryIdFactory.newQueryId();
dispatcher.stop();
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/global/TestGlobalQueryOptimizer.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/global/TestGlobalQueryOptimizer.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/global/TestGlobalQueryOptimizer.java
index 4455763..c665b44 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/global/TestGlobalQueryOptimizer.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/global/TestGlobalQueryOptimizer.java
@@ -16,9 +16,6 @@
* limitations under the License.
*/
-/**
- *
- */
package org.apache.tajo.engine.planner.global;
import org.apache.hadoop.fs.FileSystem;
@@ -128,11 +125,11 @@ public class TestGlobalQueryOptimizer {
catalog.addTable(desc);
}
- QueryIdFactory.reset();
+ //QueryIdFactory.reset();
queryId = QueryIdFactory.newQueryId();
optimizer = new GlobalOptimizer();
}
-
+
@AfterClass
public static void terminate() throws IOException {
util.shutdownCatalogCluster();
@@ -147,7 +144,7 @@ public class TestGlobalQueryOptimizer {
MasterPlan globalPlan = planner.build(queryId, (LogicalRootNode) rootNode);
globalPlan = optimizer.optimize(globalPlan);
-
+
ExecutionBlock unit = globalPlan.getRoot();
StoreTableNode store = unit.getStoreTableNode();
assertEquals(NodeType.PROJECTION, store.getChild().getType());
@@ -156,14 +153,14 @@ public class TestGlobalQueryOptimizer {
SortNode sort = (SortNode) proj.getChild();
assertEquals(NodeType.SCAN, sort.getChild().getType());
ScanNode scan = (ScanNode) sort.getChild();
-
+
assertTrue(unit.hasChildBlock());
unit = unit.getChildBlock(scan);
store = unit.getStoreTableNode();
assertEquals(NodeType.SORT, store.getChild().getType());
sort = (SortNode) store.getChild();
assertEquals(NodeType.JOIN, sort.getChild().getType());
-
+
assertTrue(unit.hasChildBlock());
for (ScanNode prevscan : unit.getScanNodes()) {
ExecutionBlock prev = unit.getChildBlock(prevscan);
@@ -171,4 +168,4 @@ public class TestGlobalQueryOptimizer {
assertEquals(NodeType.SCAN, store.getChild().getType());
}
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
index dbb3862..75e3b1e 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
@@ -25,7 +25,6 @@ import org.apache.commons.codec.binary.Base64;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.tajo.QueryIdFactory;
import org.apache.tajo.QueryUnitAttemptId;
import org.apache.tajo.TajoTestingCluster;
import org.apache.tajo.TaskAttemptContext;
@@ -78,7 +77,6 @@ public class TestPhysicalPlanner {
@BeforeClass
public static void setUp() throws Exception {
- QueryIdFactory.reset();
util = new TajoTestingCluster();
util.startCatalogCluster();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestRepartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestRepartitioner.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestRepartitioner.java
index a8924dd..843df23 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestRepartitioner.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestRepartitioner.java
@@ -18,14 +18,13 @@
package org.apache.tajo.master;
+import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.QueryId;
-import org.apache.tajo.SubQueryId;
import org.apache.tajo.TestTajoIds;
import org.apache.tajo.master.ExecutionBlock.PartitionType;
import org.apache.tajo.master.querymaster.QueryUnit;
import org.apache.tajo.master.querymaster.Repartitioner;
import org.apache.tajo.util.TUtil;
-import org.apache.tajo.util.TajoIdUtils;
import org.jboss.netty.handler.codec.http.QueryStringDecoder;
import org.junit.Test;
@@ -37,10 +36,10 @@ import static junit.framework.Assert.assertEquals;
public class TestRepartitioner {
@Test
public void testCreateHashFetchURL() throws Exception {
- QueryId q1 = TestTajoIds.createQueryId(1315890136000l, 2, 1);
+ QueryId q1 = TestTajoIds.createQueryId(1315890136000l, 2);
String hostName = "tajo1";
int port = 1234;
- SubQueryId sid = TajoIdUtils.createSubQueryId(q1, 2);
+ ExecutionBlockId sid = new ExecutionBlockId(q1, 2);
int partitionId = 2;
List<QueryUnit.IntermediateEntry> intermediateEntries = TUtil.newList();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TaskRunnerTest.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TaskRunnerTest.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TaskRunnerTest.java
index 05a269e..952cb0f 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TaskRunnerTest.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TaskRunnerTest.java
@@ -25,31 +25,27 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.util.BuilderUtils;
-import org.apache.tajo.QueryConf;
-import org.apache.tajo.QueryId;
-import org.apache.tajo.SubQueryId;
-import org.apache.tajo.TestTajoIds;
-import org.apache.tajo.ipc.QueryMasterProtocol.QueryMasterProtocolService;
+import org.apache.tajo.*;
+import org.apache.tajo.ipc.TajoWorkerProtocol.TajoWorkerProtocolService;
import org.apache.tajo.rpc.ProtoAsyncRpcClient;
-import org.apache.tajo.util.TajoIdUtils;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TaskRunnerTest {
long ts1 = 1315890136000l;
- QueryId q1 = TestTajoIds.createQueryId(ts1, 2, 5);
- SubQueryId sq1 = TajoIdUtils.createSubQueryId(q1, 5);
+ QueryId q1 = TestTajoIds.createQueryId(ts1, 2);
+ ExecutionBlockId sq1 = QueryIdFactory.newExecutionBlockId(q1, 5);
//@Test
public void testInit() throws Exception {
ProtoAsyncRpcClient mockClient = mock(ProtoAsyncRpcClient.class);
mockClient.close();
- QueryMasterProtocolService.Interface mockMaster =
- mock(QueryMasterProtocolService.Interface.class);
+ TajoWorkerProtocolService.Interface mockMaster =
+ mock(TajoWorkerProtocolService.Interface.class);
ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
- q1.getApplicationId(), q1.getAttemptId());
+ BuilderUtils.newApplicationId(Integer.parseInt(q1.getId()), q1.getSeq()), 1);
ContainerId cId = BuilderUtils.newContainerId(appAttemptId, 1);
NodeId nodeId = RecordFactoryProvider.getRecordFactory(null).
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
index 09ab483..cf1e9ae 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
@@ -23,7 +23,6 @@ import com.google.common.collect.Maps;
import org.apache.commons.codec.binary.Base64;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.tajo.QueryIdFactory;
import org.apache.tajo.TajoTestingCluster;
import org.apache.tajo.TaskAttemptContext;
import org.apache.tajo.algebra.Expr;
@@ -73,7 +72,6 @@ public class TestRangeRetrieverHandler {
@Before
public void setUp() throws Exception {
- QueryIdFactory.reset();
util = new TajoTestingCluster();
conf = util.getConfiguration();
testDir = CommonTestingUtil.getTestDir("target/test-data/TestRangeRetrieverHandler");
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/dataserver/TestHttpDataServer.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/dataserver/TestHttpDataServer.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/dataserver/TestHttpDataServer.java
index 6e6fdae..b70dda2 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/dataserver/TestHttpDataServer.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/dataserver/TestHttpDataServer.java
@@ -19,15 +19,15 @@
package org.apache.tajo.worker.dataserver;
import org.apache.hadoop.net.NetUtils;
-import org.junit.Before;
-import org.junit.Test;
+import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.QueryIdFactory;
import org.apache.tajo.QueryUnitId;
-import org.apache.tajo.SubQueryId;
import org.apache.tajo.util.CommonTestingUtil;
import org.apache.tajo.worker.InterDataRetriever;
import org.apache.tajo.worker.dataserver.retriever.DataRetriever;
import org.apache.tajo.worker.dataserver.retriever.DirectoryRetriever;
+import org.junit.Before;
+import org.junit.Test;
import java.io.*;
import java.net.InetSocketAddress;
@@ -77,8 +77,7 @@ public class TestHttpDataServer {
@Test
public final void testInterDataRetriver() throws Exception {
- QueryIdFactory.reset();
- SubQueryId schid = QueryIdFactory.newSubQueryId(
+ ExecutionBlockId schid = QueryIdFactory.newExecutionBlockId(
QueryIdFactory.newQueryId());
QueryUnitId qid1 = QueryIdFactory.newQueryUnitId(schid);
QueryUnitId qid2 = QueryIdFactory.newQueryUnitId(schid);
@@ -119,8 +118,7 @@ public class TestHttpDataServer {
@Test(expected = FileNotFoundException.class)
public final void testNoSuchFile() throws Exception {
- QueryIdFactory.reset();
- SubQueryId schid = QueryIdFactory.newSubQueryId(
+ ExecutionBlockId schid = QueryIdFactory.newExecutionBlockId(
QueryIdFactory.newQueryId());
QueryUnitId qid1 = QueryIdFactory.newQueryUnitId(schid);
QueryUnitId qid2 = QueryIdFactory.newQueryUnitId(schid);