You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/08/26 14:29:15 UTC
[6/8] TAJO-127: Implement Tajo Resource Manager. (hyoungjunkim via
hyunsik)
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskRunnerLauncherImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskRunnerLauncherImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskRunnerLauncherImpl.java
deleted file mode 100644
index e6d4c56..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskRunnerLauncherImpl.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.service.AbstractService;
-import org.apache.tajo.SubQueryId;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.master.TaskRunnerGroupEvent.EventType;
-import org.apache.tajo.master.event.QueryEvent;
-import org.apache.tajo.master.event.QueryEventType;
-import org.apache.tajo.master.querymaster.QueryMaster;
-import org.apache.tajo.master.querymaster.QueryMaster.QueryContext;
-import org.apache.tajo.worker.TaskRunner;
-
-import java.util.Collection;
-import java.util.Map;
-import java.util.Vector;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-public class TaskRunnerLauncherImpl extends AbstractService implements TaskRunnerLauncher {
-
- /** Class Logger */
- private static final Log LOG = LogFactory.getLog(TaskRunnerLauncherImpl.class);
- private QueryContext context;
- private final String queryMasterHost;
- private final int queryMasterPort;
-
- // For ContainerLauncherSpec
- private ContainerLaunchContext commonContainerSpec = null;
-
- /** for launching TaskRunners in parallel */
- private final ExecutorService executorService;
-
- public TaskRunnerLauncherImpl(QueryContext context) {
- super(TaskRunnerLauncherImpl.class.getName());
- this.context = context;
- queryMasterHost = context.getQueryMasterServiceAddress().getHostName();
- queryMasterPort = context.getQueryMasterServiceAddress().getPort();
- executorService = Executors.newFixedThreadPool(
- context.getConf().getIntVar(TajoConf.ConfVars.AM_TASKRUNNER_LAUNCH_PARALLEL_NUM));
- }
-
- public void start() {
- super.start();
- }
-
- public void stop() {
- executorService.shutdownNow();
- Map<ContainerId, ContainerProxy> containers = context.getContainers();
- for(ContainerProxy eachProxy: containers.values()) {
- try {
- eachProxy.kill();
- } catch (Exception e) {
- }
- }
- super.stop();
- }
-
- @Override
- public void handle(TaskRunnerGroupEvent event) {
- if (event.getType() == EventType.CONTAINER_REMOTE_LAUNCH) {
- launchTaskRunners(event.subQueryId, event.getContainers());
- } else if (event.getType() == EventType.CONTAINER_REMOTE_CLEANUP) {
- killTaskRunners(event.getContainers());
- }
- }
-
- private void launchTaskRunners(SubQueryId subQueryId, Collection<Container> containers) {
- commonContainerSpec = ContainerProxy.createCommonContainerLaunchContext(getConfig(), subQueryId.toString(), false);
- for (Container container : containers) {
- final ContainerProxy proxy =
- new TaskRunnerContainerProxy(context, getConfig(), context.getYarnRPC(), container, subQueryId);
- executorService.submit(new LaunchRunner(container.getId(), proxy));
- }
- }
-
- private class LaunchRunner implements Runnable {
- private final ContainerProxy proxy;
- private final ContainerId id;
-
- public LaunchRunner(ContainerId id, ContainerProxy proxy) {
- this.proxy = proxy;
- this.id = id;
- }
- @Override
- public void run() {
- proxy.launch(commonContainerSpec);
- LOG.info("ContainerProxy started:" + id);
- }
- }
-
- private void killTaskRunners(Collection<Container> containers) {
- for (Container container : containers) {
- final ContainerProxy proxy = context.getContainer(container.getId());
- executorService.submit(new KillRunner(container.getId(), proxy));
- }
- }
-
- private class KillRunner implements Runnable {
- private final ContainerProxy proxy;
- private final ContainerId id;
- public KillRunner(ContainerId id, ContainerProxy proxy) {
- this.id = id;
- this.proxy = proxy;
- }
-
- @Override
- public void run() {
- proxy.kill();
- LOG.info("ContainerProxy killed:" + id);
- }
- }
-
- public class TaskRunnerContainerProxy extends ContainerProxy {
- private final SubQueryId subQueryId;
-
- public TaskRunnerContainerProxy(QueryMaster.QueryContext context, Configuration conf, YarnRPC yarnRPC,
- Container container, SubQueryId subQueryId) {
- super(context, conf, yarnRPC, container);
- this.subQueryId = subQueryId;
- }
-
- @Override
- protected void containerStarted() {
- context.getEventHandler().handle(new QueryEvent(context.getQueryId(), QueryEventType.INIT_COMPLETED));
- }
-
- @Override
- protected String getId() {
- return subQueryId.toString();
- }
-
- @Override
- protected String getRunnerClass() {
- return TaskRunner.class.getCanonicalName();
- }
-
- @Override
- protected Vector<CharSequence> getTaskParams() {
- Vector<CharSequence> taskParams = new Vector<CharSequence>();
- taskParams.add(queryMasterHost); // queryMaster hostname
- taskParams.add(String.valueOf(queryMasterPort)); // queryMaster port
-
- return taskParams;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
index c9702b4..651f9c0 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
@@ -23,28 +23,25 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.hadoop.yarn.util.RackResolver;
+import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.QueryIdFactory;
import org.apache.tajo.QueryUnitAttemptId;
-import org.apache.tajo.SubQueryId;
import org.apache.tajo.engine.planner.logical.ScanNode;
import org.apache.tajo.engine.query.QueryUnitRequestImpl;
-import org.apache.tajo.ipc.QueryMasterProtocol;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
import org.apache.tajo.ipc.protocolrecords.QueryUnitRequest;
import org.apache.tajo.master.event.TaskAttemptAssignedEvent;
import org.apache.tajo.master.event.TaskRequestEvent;
-import org.apache.tajo.master.event.TaskRequestEvent.TaskRequestEventType;
import org.apache.tajo.master.event.TaskScheduleEvent;
import org.apache.tajo.master.event.TaskSchedulerEvent;
import org.apache.tajo.master.event.TaskSchedulerEvent.EventType;
-import org.apache.tajo.master.querymaster.QueryMaster.QueryContext;
+import org.apache.tajo.master.querymaster.QueryMasterTask;
import org.apache.tajo.master.querymaster.QueryUnit;
import org.apache.tajo.storage.Fragment;
import org.apache.tajo.util.NetUtils;
-import org.apache.tajo.util.TajoIdUtils;
import java.net.URI;
import java.util.*;
@@ -55,8 +52,8 @@ public class TaskSchedulerImpl extends AbstractService
implements TaskScheduler {
private static final Log LOG = LogFactory.getLog(TaskScheduleEvent.class);
- private final QueryContext context;
- private AsyncDispatcher dispatcher;
+ private final QueryMasterTask.QueryContext context;
+ private TajoAsyncDispatcher dispatcher;
private Thread eventHandlingThread;
private Thread schedulingThread;
@@ -72,21 +69,22 @@ public class TaskSchedulerImpl extends AbstractService
private int rackLocalAssigned = 0;
private int totalAssigned = 0;
- public TaskSchedulerImpl(QueryContext context) {
+ public TaskSchedulerImpl(QueryMasterTask.QueryContext context) {
super(TaskSchedulerImpl.class.getName());
this.context = context;
this.dispatcher = context.getDispatcher();
}
+ @Override
public void init(Configuration conf) {
scheduledRequests = new ScheduledRequests();
taskRequests = new TaskRequests();
- dispatcher.register(TaskRequestEventType.class, taskRequests);
super.init(conf);
}
+ @Override
public void start() {
LOG.info("Start TaskScheduler");
this.eventHandlingThread = new Thread() {
@@ -113,13 +111,14 @@ public class TaskSchedulerImpl extends AbstractService
while(!stopEventHandling && !Thread.currentThread().isInterrupted()) {
try {
- Thread.sleep(100);
+ Thread.sleep(1000);
} catch (InterruptedException e) {
break;
}
schedule();
}
+ //req.getCallback().run(stopTaskRunnerReq);
LOG.info("TaskScheduler schedulingThread stopped");
}
};
@@ -128,15 +127,15 @@ public class TaskSchedulerImpl extends AbstractService
super.start();
}
- private static final QueryUnitAttemptId NULL_ID;
- private static final QueryMasterProtocol.QueryUnitRequestProto stopTaskRunnerReq;
+ private static final QueryUnitAttemptId NULL_ATTEMPT_ID;
+ public static final TajoWorkerProtocol.QueryUnitRequestProto stopTaskRunnerReq;
static {
- SubQueryId nullSubQuery =
- QueryIdFactory.newSubQueryId(TajoIdUtils.NullQueryId);
- NULL_ID = QueryIdFactory.newQueryUnitAttemptId(QueryIdFactory.newQueryUnitId(nullSubQuery, 0), 0);
+ ExecutionBlockId nullSubQuery = QueryIdFactory.newExecutionBlockId(QueryIdFactory.NULL_QUERY_ID, 0);
+ NULL_ATTEMPT_ID = QueryIdFactory.newQueryUnitAttemptId(QueryIdFactory.newQueryUnitId(nullSubQuery, 0), 0);
- QueryMasterProtocol.QueryUnitRequestProto.Builder builder = QueryMasterProtocol.QueryUnitRequestProto.newBuilder();
- builder.setId(NULL_ID.getProto());
+ TajoWorkerProtocol.QueryUnitRequestProto.Builder builder =
+ TajoWorkerProtocol.QueryUnitRequestProto.newBuilder();
+ builder.setId(NULL_ATTEMPT_ID.getProto());
builder.setShouldDie(true);
builder.setOutputTable("");
builder.setSerializedData("");
@@ -144,7 +143,7 @@ public class TaskSchedulerImpl extends AbstractService
stopTaskRunnerReq = builder.build();
}
-
+ @Override
public void stop() {
stopEventHandling = true;
eventHandlingThread.interrupt();
@@ -205,12 +204,12 @@ public class TaskSchedulerImpl extends AbstractService
public void handle(TaskSchedulerEvent event) {
int qSize = eventQueue.size();
if (qSize != 0 && qSize % 1000 == 0) {
- LOG.info("Size of event-queue in RMContainerAllocator is " + qSize);
+ LOG.info("Size of event-queue in YarnRMContainerAllocator is " + qSize);
}
int remCapacity = eventQueue.remainingCapacity();
if (remCapacity < 1000) {
LOG.warn("Very low remaining capacity in the event-queue "
- + "of RMContainerAllocator: " + remCapacity);
+ + "of YarnRMContainerAllocator: " + remCapacity);
}
try {
@@ -220,20 +219,29 @@ public class TaskSchedulerImpl extends AbstractService
}
}
+ public void handleTaskRequestEvent(TaskRequestEvent event) {
+ taskRequests.handle(event);
+ }
+
private class TaskRequests implements EventHandler<TaskRequestEvent> {
private final LinkedBlockingQueue<TaskRequestEvent> taskRequestQueue =
new LinkedBlockingQueue<TaskRequestEvent>();
@Override
public void handle(TaskRequestEvent event) {
+ LOG.info("====>TaskRequest:" + event.getContainerId() + "," + event.getExecutionBlockId());
+ if(stopEventHandling) {
+ event.getCallback().run(stopTaskRunnerReq);
+ return;
+ }
int qSize = taskRequestQueue.size();
if (qSize != 0 && qSize % 1000 == 0) {
- LOG.info("Size of event-queue in RMContainerAllocator is " + qSize);
+ LOG.info("Size of event-queue in YarnRMContainerAllocator is " + qSize);
}
int remCapacity = taskRequestQueue.remainingCapacity();
if (remCapacity < 1000) {
LOG.warn("Very low remaining capacity in the event-queue "
- + "of RMContainerAllocator: " + remCapacity);
+ + "of YarnRMContainerAllocator: " + remCapacity);
}
taskRequestQueue.add(event);
@@ -380,15 +388,16 @@ public class TaskSchedulerImpl extends AbstractService
return nonLeafTasks.size();
}
- public Set<QueryUnitAttemptId> AssignedRequest = new HashSet<QueryUnitAttemptId>();
+ public Set<QueryUnitAttemptId> assignedRequest = new HashSet<QueryUnitAttemptId>();
+
public void assignToLeafTasks(List<TaskRequestEvent> taskRequests) {
Iterator<TaskRequestEvent> it = taskRequests.iterator();
- LOG.info("Got task requests " + taskRequests.size());
TaskRequestEvent taskRequest;
while (it.hasNext() && leafTasks.size() > 0) {
taskRequest = it.next();
- ContainerProxy container = context.getContainer(taskRequest.getContainerId());
+ LOG.info("====> assignToLeafTasks: " + taskRequest.getExecutionBlockId());
+ ContainerProxy container = context.getResourceAllocator().getContainer(taskRequest.getContainerId());
String host = container.getTaskHostName();
QueryUnitAttemptId attemptId = null;
@@ -443,7 +452,7 @@ public class TaskSchedulerImpl extends AbstractService
if (attemptId != null) {
QueryUnit task = context.getQuery()
- .getSubQuery(attemptId.getSubQueryId()).getQueryUnit(attemptId.getQueryUnitId());
+ .getSubQuery(attemptId.getQueryUnitId().getExecutionBlockId()).getQueryUnit(attemptId.getQueryUnitId());
QueryUnitRequest taskAssign = new QueryUnitRequestImpl(
attemptId,
new ArrayList<Fragment>(task.getAllFragments()),
@@ -457,7 +466,7 @@ public class TaskSchedulerImpl extends AbstractService
context.getEventHandler().handle(new TaskAttemptAssignedEvent(attemptId,
taskRequest.getContainerId(),
host, container.getTaskPort()));
- AssignedRequest.add(attemptId);
+ assignedRequest.add(attemptId);
totalAssigned++;
taskRequest.getCallback().run(taskAssign.getProto());
@@ -476,6 +485,7 @@ public class TaskSchedulerImpl extends AbstractService
TaskRequestEvent taskRequest;
while (it.hasNext()) {
taskRequest = it.next();
+ LOG.info("====> assignToNonLeafTasks: " + taskRequest.getExecutionBlockId());
QueryUnitAttemptId attemptId;
// random allocation
@@ -485,7 +495,8 @@ public class TaskSchedulerImpl extends AbstractService
LOG.debug("Assigned based on * match");
QueryUnit task;
- task = context.getSubQuery(attemptId.getSubQueryId()).getQueryUnit(attemptId.getQueryUnitId());
+ task = context.getSubQuery(
+ attemptId.getQueryUnitId().getExecutionBlockId()).getQueryUnit(attemptId.getQueryUnitId());
QueryUnitRequest taskAssign = new QueryUnitRequestImpl(
attemptId,
Lists.newArrayList(task.getAllFragments()),
@@ -504,7 +515,7 @@ public class TaskSchedulerImpl extends AbstractService
}
}
- ContainerProxy container = context.getContainer(
+ ContainerProxy container = context.getResourceAllocator().getContainer(
taskRequest.getContainerId());
context.getEventHandler().handle(new TaskAttemptAssignedEvent(attemptId,
taskRequest.getContainerId(), container.getTaskHostName(), container.getTaskPort()));
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/YarnContainerProxy.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/YarnContainerProxy.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/YarnContainerProxy.java
new file mode 100644
index 0000000..1e650e1
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/YarnContainerProxy.java
@@ -0,0 +1,446 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.ContainerManager;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
+import org.apache.hadoop.yarn.api.records.*;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.ProtoUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.QueryConf;
+import org.apache.tajo.TajoConstants;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.master.event.QueryEvent;
+import org.apache.tajo.master.event.QueryEventType;
+import org.apache.tajo.master.querymaster.QueryMasterTask;
+import org.apache.tajo.pullserver.PullServerAuxService;
+import org.apache.tajo.worker.TajoWorker;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.security.PrivilegedAction;
+import java.util.*;
+
+public class YarnContainerProxy extends ContainerProxy {
+ private final static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
+
+ protected final YarnRPC yarnRPC;
+ final protected String containerMgrAddress;
+ protected ContainerToken containerToken;
+
+ public YarnContainerProxy(QueryMasterTask.QueryContext context, Configuration conf, YarnRPC yarnRPC,
+ Container container, ExecutionBlockId executionBlockId) {
+ super(context, conf, executionBlockId, container);
+ this.yarnRPC = yarnRPC;
+
+ NodeId nodeId = container.getNodeId();
+ this.containerMgrAddress = nodeId.getHost() + ":" + nodeId.getPort();
+ this.containerToken = container.getContainerToken();
+ }
+
+ protected ContainerManager getCMProxy(ContainerId containerID,
+ final String containerManagerBindAddr,
+ ContainerToken containerToken)
+ throws IOException {
+ String [] hosts = containerManagerBindAddr.split(":");
+ final InetSocketAddress cmAddr =
+ new InetSocketAddress(hosts[0], Integer.parseInt(hosts[1]));
+ UserGroupInformation user = UserGroupInformation.getCurrentUser();
+
+ if (UserGroupInformation.isSecurityEnabled()) {
+ Token<ContainerTokenIdentifier> token =
+ ProtoUtils.convertFromProtoFormat(containerToken, cmAddr);
+ // the user in createRemoteUser in this context has to be ContainerID
+ user = UserGroupInformation.createRemoteUser(containerID.toString());
+ user.addToken(token);
+ }
+
+ ContainerManager proxy = user.doAs(new PrivilegedAction<ContainerManager>() {
+ @Override
+ public ContainerManager run() {
+ return (ContainerManager) yarnRPC.getProxy(ContainerManager.class,
+ cmAddr, conf);
+ }
+ });
+
+ return proxy;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public synchronized void launch(ContainerLaunchContext commonContainerLaunchContext) {
+ LOG.info("Launching Container with Id: " + containerID);
+ if(this.state == ContainerState.KILLED_BEFORE_LAUNCH) {
+ state = ContainerState.DONE;
+ LOG.error("Container (" + containerID + " was killed before it was launched");
+ return;
+ }
+
+ ContainerManager proxy = null;
+ try {
+
+ proxy = getCMProxy(containerID, containerMgrAddress,
+ containerToken);
+
+ // Construct the actual Container
+ ContainerLaunchContext containerLaunchContext = createContainerLaunchContext(commonContainerLaunchContext);
+
+ // Now launch the actual container
+ StartContainerRequest startRequest = Records
+ .newRecord(StartContainerRequest.class);
+ startRequest.setContainerLaunchContext(containerLaunchContext);
+ StartContainerResponse response = proxy.startContainer(startRequest);
+
+ ByteBuffer portInfo = response
+ .getServiceResponse(PullServerAuxService.PULLSERVER_SERVICEID);
+
+ if(portInfo != null) {
+ port = PullServerAuxService.deserializeMetaData(portInfo);
+ }
+
+ LOG.info("PullServer port returned by ContainerManager for "
+ + containerID + " : " + port);
+
+ if(port < 0) {
+ this.state = ContainerState.FAILED;
+ throw new IllegalStateException("Invalid shuffle port number "
+ + port + " returned for " + containerID);
+ }
+
+ context.getEventHandler().handle(new QueryEvent(context.getQueryId(), QueryEventType.INIT_COMPLETED));
+
+ this.state = ContainerState.RUNNING;
+ this.hostName = containerMgrAddress.split(":")[0];
+ context.getResourceAllocator().addContainer(containerID, this);
+ } catch (Throwable t) {
+ String message = "Container launch failed for " + containerID + " : "
+ + StringUtils.stringifyException(t);
+ this.state = ContainerState.FAILED;
+ LOG.error(message);
+ } finally {
+ if (proxy != null) {
+ yarnRPC.stopProxy(proxy, conf);
+ }
+ }
+ }
+
+
+ public ContainerLaunchContext createContainerLaunchContext(ContainerLaunchContext commonContainerLaunchContext) {
+ // Setup environment by cloning from common env.
+ Map<String, String> env = commonContainerLaunchContext.getEnvironment();
+ Map<String, String> myEnv = new HashMap<String, String>(env.size());
+ myEnv.putAll(env);
+
+ // Duplicate the ByteBuffers for access by multiple containers.
+ Map<String, ByteBuffer> myServiceData = new HashMap<String, ByteBuffer>();
+ for (Map.Entry<String, ByteBuffer> entry : commonContainerLaunchContext.getServiceData().entrySet()) {
+ myServiceData.put(entry.getKey(), entry.getValue().duplicate());
+ }
+
+ ////////////////////////////////////////////////////////////////////////////
+ // Set the local resources
+ ////////////////////////////////////////////////////////////////////////////
+ // Set the necessary command to execute the application master
+ Vector<CharSequence> vargs = new Vector<CharSequence>(30);
+
+ // Set java executable command
+ //LOG.info("Setting up app master command");
+ vargs.add("${JAVA_HOME}" + "/bin/java");
+ // Set Xmx based on am memory size
+ vargs.add("-Xmx2000m");
+ // Set Remote Debugging
+ //if (!context.getQuery().getSubQuery(event.getExecutionBlockId()).isLeafQuery()) {
+ //vargs.add("-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=5005");
+ //}
+ // Set class name
+ //vargs.add(getRunnerClass());
+ vargs.add(TajoWorker.class.getCanonicalName());
+ vargs.add("tr"); //workerMode
+ vargs.add(getId()); // subqueryId
+ vargs.add(containerMgrAddress); // nodeId
+ vargs.add(containerID.toString()); // containerId
+ Vector<CharSequence> taskParams = getTaskParams();
+ if(taskParams != null) {
+ vargs.addAll(taskParams);
+ }
+
+ vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout");
+ vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr");
+
+ // Get final commmand
+ StringBuilder command = new StringBuilder();
+ for (CharSequence str : vargs) {
+ command.append(str).append(" ");
+ }
+
+ LOG.info("Completed setting up TaskRunner command " + command.toString());
+ List<String> commands = new ArrayList<String>();
+ commands.add(command.toString());
+
+ return BuilderUtils.newContainerLaunchContext(containerID, commonContainerLaunchContext.getUser(),
+ container.getResource(), commonContainerLaunchContext.getLocalResources(), myEnv, commands,
+ myServiceData, null, new HashMap<ApplicationAccessType, String>());
+ }
+
+ public static ContainerLaunchContext createCommonContainerLaunchContext(Configuration config,
+ String queryId, boolean isMaster) {
+ TajoConf conf = (TajoConf)config;
+
+ ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class);
+ try {
+ ctx.setUser(UserGroupInformation.getCurrentUser().getShortUserName());
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ ////////////////////////////////////////////////////////////////////////////
+ // Set the env variables to be setup
+ ////////////////////////////////////////////////////////////////////////////
+ LOG.info("Set the environment for the application master");
+
+ Map<String, String> environment = new HashMap<String, String>();
+ //String initialClassPath = getInitialClasspath(conf);
+ environment.put(ApplicationConstants.Environment.SHELL.name(), "/bin/bash");
+ if(System.getenv(ApplicationConstants.Environment.JAVA_HOME.name()) != null) {
+ environment.put(ApplicationConstants.Environment.JAVA_HOME.name(), System.getenv(ApplicationConstants.Environment.JAVA_HOME.name()));
+ }
+
+ // TODO - to be improved with org.apache.tajo.sh shell script
+ Properties prop = System.getProperties();
+
+ if (prop.getProperty("tajo.test", "FALSE").equalsIgnoreCase("TRUE") ||
+ (System.getenv("tajo.test") != null && System.getenv("tajo.test").equalsIgnoreCase("TRUE"))) {
+ LOG.info("=========> tajo.test is TRUE");
+ environment.put(ApplicationConstants.Environment.CLASSPATH.name(), prop.getProperty(
+ "java.class.path", null));
+ environment.put("tajo.test", "TRUE");
+ } else {
+ // Add AppMaster.jar location to classpath
+ // At some point we should not be required to add
+ // the hadoop specific classpaths to the env.
+ // It should be provided out of the box.
+ // For now setting all required classpaths including
+ // the classpath to "." for the application jar
+ StringBuilder classPathEnv = new StringBuilder("./");
+ //for (String c : conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH)) {
+ for (String c : YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH) {
+ classPathEnv.append(':');
+ classPathEnv.append(c.trim());
+ }
+
+ classPathEnv.append(":" + System.getenv("TAJO_BASE_CLASSPATH"));
+ classPathEnv.append(":./log4j.properties:./*");
+ if(System.getenv("HADOOP_HOME") != null) {
+ environment.put("HADOOP_HOME", System.getenv("HADOOP_HOME"));
+ environment.put(
+ ApplicationConstants.Environment.HADOOP_COMMON_HOME.name(),
+ System.getenv("HADOOP_HOME"));
+ environment.put(
+ ApplicationConstants.Environment.HADOOP_HDFS_HOME.name(),
+ System.getenv("HADOOP_HOME"));
+ environment.put(
+ ApplicationConstants.Environment.HADOOP_YARN_HOME.name(),
+ System.getenv("HADOOP_HOME"));
+ }
+
+ if(System.getenv("TAJO_BASE_CLASSPATH") != null) {
+ environment.put("TAJO_BASE_CLASSPATH", System.getenv("TAJO_BASE_CLASSPATH"));
+ }
+ environment.put(ApplicationConstants.Environment.CLASSPATH.name(), classPathEnv.toString());
+ }
+
+ ctx.setEnvironment(environment);
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("=================================================");
+ for(Map.Entry<String, String> entry: environment.entrySet()) {
+ LOG.debug(entry.getKey() + "=" + entry.getValue());
+ }
+ LOG.debug("=================================================");
+ }
+ ////////////////////////////////////////////////////////////////////////////
+ // Set the local resources
+ ////////////////////////////////////////////////////////////////////////////
+ Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
+ FileSystem fs = null;
+
+ LOG.info("defaultFS: " + conf.get("fs.default.name"));
+ LOG.info("defaultFS: " + conf.get("fs.defaultFS"));
+ try {
+ fs = FileSystem.get(conf);
+ } catch (IOException e) {
+ LOG.error(e.getMessage(), e);
+ }
+
+ FileContext fsCtx = null;
+ try {
+ fsCtx = FileContext.getFileContext(conf);
+ } catch (UnsupportedFileSystemException e) {
+ LOG.error(e.getMessage(), e);
+ }
+
+ LOG.info("Writing a QueryConf to HDFS and add to local environment");
+ //Path queryConfPath = new Path(fs.getHomeDirectory(), QueryConf.FILENAME);
+ try {
+ //writeConf(conf, queryConfPath);
+ // TODO move to tajo temp
+ Path warehousePath = new Path(conf.getVar(TajoConf.ConfVars.ROOT_DIR), TajoConstants.WAREHOUSE_DIR);
+ Path queryConfPath = new Path(warehousePath, queryId);
+ if(isMaster) {
+ queryConfPath = new Path(queryConfPath, QueryConf.QUERY_MASTER_FILENAME);
+ } else {
+ queryConfPath = new Path(queryConfPath, QueryConf.FILENAME);
+ }
+
+ if(!fs.exists(queryConfPath)){
+ writeConf(conf, queryConfPath);
+ } else {
+ LOG.warn("QueryConf already exist. path: " + queryConfPath.toString());
+ }
+
+ LocalResource queryConfSrc = createApplicationResource(fsCtx,
+ queryConfPath, LocalResourceType.FILE);
+// localResources.put(QueryConf.FILENAME, queryConfSrc);
+ localResources.put(queryConfPath.getName(), queryConfSrc);
+
+ ctx.setLocalResources(localResources);
+ } catch (IOException e) {
+ LOG.error(e.getMessage(), e);
+ }
+
+ // TODO - move to sub-class
+ // Add shuffle token
+ Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>();
+ try {
+ //LOG.info("Putting shuffle token in serviceData");
+ serviceData.put(PullServerAuxService.PULLSERVER_SERVICEID,
+ PullServerAuxService.serializeMetaData(0));
+ } catch (IOException ioe) {
+ LOG.error(ioe);
+ }
+ ctx.setServiceData(serviceData);
+
+ return ctx;
+ }
+
+ private static LocalResource createApplicationResource(FileContext fs,
+ Path p, LocalResourceType type)
+ throws IOException {
+ LocalResource rsrc = recordFactory.newRecordInstance(LocalResource.class);
+ FileStatus rsrcStat = fs.getFileStatus(p);
+ rsrc.setResource(ConverterUtils.getYarnUrlFromPath(fs
+ .getDefaultFileSystem().resolvePath(rsrcStat.getPath())));
+ rsrc.setSize(rsrcStat.getLen());
+ rsrc.setTimestamp(rsrcStat.getModificationTime());
+ rsrc.setType(type);
+ rsrc.setVisibility(LocalResourceVisibility.APPLICATION);
+ return rsrc;
+ }
+
+ private static void writeConf(Configuration conf, Path queryConfFile)
+ throws IOException {
+ // Write job file to Tajo's fs
+ FileSystem fs = queryConfFile.getFileSystem(conf);
+ FSDataOutputStream out =
+ FileSystem.create(fs, queryConfFile,
+ new FsPermission(QUERYCONF_FILE_PERMISSION));
+ try {
+ conf.writeXml(out);
+ } finally {
+ out.close();
+ }
+ }
+
+ @Override
+ public synchronized void stopContainer() {
+
+ if(isCompletelyDone()) {
+ return;
+ }
+ if(this.state == ContainerState.PREP) {
+ this.state = ContainerState.KILLED_BEFORE_LAUNCH;
+ } else {
+ LOG.info("KILLING " + containerID);
+
+ ContainerManager proxy = null;
+ try {
+ proxy = getCMProxy(this.containerID, this.containerMgrAddress,
+ this.containerToken);
+
+ // kill the remote container if already launched
+ StopContainerRequest stopRequest = Records
+ .newRecord(StopContainerRequest.class);
+ stopRequest.setContainerId(this.containerID);
+ proxy.stopContainer(stopRequest);
+ // If stopContainer returns without an error, assuming the stop made
+ // it over to the NodeManager.
+// context.getEventHandler().handle(
+// new AMContainerEvent(containerID, AMContainerEventType.C_NM_STOP_SENT));
+ context.getResourceAllocator().removeContainer(containerID);
+ } catch (Throwable t) {
+
+ // ignore the cleanup failure
+ String message = "cleanup failed for container "
+ + this.containerID + " : "
+ + StringUtils.stringifyException(t);
+// context.getEventHandler().handle(
+// new AMContainerEventStopFailed(containerID, message));
+ LOG.warn(message);
+ this.state = ContainerState.DONE;
+ return;
+ } finally {
+ if (proxy != null) {
+ yarnRPC.stopProxy(proxy, conf);
+ }
+ }
+ this.state = ContainerState.DONE;
+ }
+ }
+
+ protected Vector<CharSequence> getTaskParams() {
+ String queryMasterHost = context.getQueryMasterContext().getWorkerContext()
+ .getTajoWorkerManagerService().getBindAddr().getHostName();
+ int queryMasterPort = context.getQueryMasterContext().getWorkerContext()
+ .getTajoWorkerManagerService().getBindAddr().getPort();
+
+ Vector<CharSequence> taskParams = new Vector<CharSequence>();
+ taskParams.add(queryMasterHost); // queryMaster hostname
+ taskParams.add(String.valueOf(queryMasterPort)); // queryMaster port
+ taskParams.add(context.getOutputPath().toString());
+ return taskParams;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/YarnTaskRunnerLauncherImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/YarnTaskRunnerLauncherImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/YarnTaskRunnerLauncherImpl.java
new file mode 100644
index 0000000..5ac4fb5
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/YarnTaskRunnerLauncherImpl.java
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.master.TaskRunnerGroupEvent.EventType;
+import org.apache.tajo.master.querymaster.QueryMasterTask;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class YarnTaskRunnerLauncherImpl extends AbstractService implements TaskRunnerLauncher {
+
+ /** Class Logger */
+ private static final Log LOG = LogFactory.getLog(YarnTaskRunnerLauncherImpl.class);
+ //private final YarnRPC yarnRPC;
+ private final static RecordFactory recordFactory =
+ RecordFactoryProvider.getRecordFactory(null);
+ private QueryMasterTask.QueryContext context;
+
+ // For ContainerLauncherSpec
+ private static AtomicBoolean initialClasspathFlag = new AtomicBoolean();
+ private static String initialClasspath = null;
+ private static final Object classpathLock = new Object();
+ private ContainerLaunchContext commonContainerSpec = null;
+
+ final public static FsPermission QUERYCONF_FILE_PERMISSION =
+ FsPermission.createImmutable((short) 0644); // rw-r--r--
+
+ /** for launching TaskRunners in parallel */
+ private final ExecutorService executorService;
+
+ private YarnRPC yarnRPC;
+
+ public YarnTaskRunnerLauncherImpl(QueryMasterTask.QueryContext context, YarnRPC yarnRPC) {
+ super(YarnTaskRunnerLauncherImpl.class.getName());
+ this.context = context;
+ this.yarnRPC = yarnRPC;
+ executorService = Executors.newFixedThreadPool(
+ context.getConf().getIntVar(TajoConf.ConfVars.AM_TASKRUNNER_LAUNCH_PARALLEL_NUM));
+ }
+
+ public void start() {
+ super.start();
+ }
+
+ public void stop() {
+ executorService.shutdownNow();
+
+ while(!executorService.isTerminated()) {
+ LOG.info("====>executorService.isTerminated:" + executorService.isTerminated() + "," + executorService.isShutdown());
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ }
+ }
+ Map<ContainerId, ContainerProxy> containers = context.getResourceAllocator().getContainers();
+ for(ContainerProxy eachProxy: containers.values()) {
+ try {
+ eachProxy.stopContainer();
+ } catch (Exception e) {
+ }
+ }
+ super.stop();
+ }
+
+ @Override
+ public void handle(TaskRunnerGroupEvent event) {
+ if (event.getType() == EventType.CONTAINER_REMOTE_LAUNCH) {
+ launchTaskRunners(event.executionBlockId, event.getContainers());
+ } else if (event.getType() == EventType.CONTAINER_REMOTE_CLEANUP) {
+ stopTaskRunners(event.getContainers());
+ }
+ }
+
+ private void launchTaskRunners(ExecutionBlockId executionBlockId, Collection<Container> containers) {
+ commonContainerSpec = YarnContainerProxy.createCommonContainerLaunchContext(getConfig(),
+ executionBlockId.getQueryId().toString(), false);
+ for (Container container : containers) {
+ final ContainerProxy proxy = new YarnContainerProxy(context, getConfig(),
+ yarnRPC, container, executionBlockId);
+ executorService.submit(new LaunchRunner(container.getId(), proxy));
+ }
+ }
+
+ protected class LaunchRunner implements Runnable {
+ private final ContainerProxy proxy;
+ private final ContainerId id;
+ public LaunchRunner(ContainerId id, ContainerProxy proxy) {
+ this.proxy = proxy;
+ this.id = id;
+ }
+ @Override
+ public void run() {
+ proxy.launch(commonContainerSpec);
+ LOG.info("ContainerProxy started:" + id);
+ }
+ }
+
+ private void stopTaskRunners(Collection<Container> containers) {
+ for (Container container : containers) {
+ final ContainerProxy proxy = context.getResourceAllocator().getContainer(container.getId());
+ executorService.submit(new StopContainerRunner(container.getId(), proxy));
+ }
+ }
+
+ private class StopContainerRunner implements Runnable {
+ private final ContainerProxy proxy;
+ private final ContainerId id;
+ public StopContainerRunner(ContainerId id, ContainerProxy proxy) {
+ this.id = id;
+ this.proxy = proxy;
+ }
+
+ @Override
+ public void run() {
+ proxy.stopContainer();
+ LOG.info("ContainerProxy stopped:" + id);
+ }
+ }
+
+
+ /**
+ * Lock this on initialClasspath so that there is only one fork in the AM for
+ * getting the initial class-path. TODO: We already construct
+ * a parent CLC and use it for all the containers, so this should go away
+ * once the mr-generated-classpath stuff is gone.
+ */
+ private static String getInitialClasspath(Configuration conf) {
+ synchronized (classpathLock) {
+ if (initialClasspathFlag.get()) {
+ return initialClasspath;
+ }
+ Map<String, String> env = new HashMap<String, String>();
+
+ initialClasspath = env.get(Environment.CLASSPATH.name());
+ initialClasspathFlag.set(true);
+ return initialClasspath;
+ }
+ }
+
+// public class TaskRunnerContainerProxy extends ContainerProxy {
+// private final ExecutionBlockId executionBlockId;
+//
+// public TaskRunnerContainerProxy(QueryMasterTask.QueryContext context, Configuration conf, YarnRPC yarnRPC,
+// Container container, ExecutionBlockId executionBlockId) {
+// super(context, conf, yarnRPC, container);
+// this.executionBlockId = executionBlockId;
+// }
+//
+// @Override
+// protected void containerStarted() {
+// context.getEventHandler().handle(new QueryEvent(context.getQueryId(), QueryEventType.INIT_COMPLETED));
+// }
+//
+// @Override
+// protected String getId() {
+// return executionBlockId.toString();
+// }
+//
+// @Override
+// protected String getRunnerClass() {
+// return TaskRunner.class.getCanonicalName();
+// }
+//
+// @Override
+// protected Vector<CharSequence> getTaskParams() {
+// Vector<CharSequence> taskParams = new Vector<CharSequence>();
+// taskParams.add(queryMasterHost); // queryMaster hostname
+// taskParams.add(String.valueOf(queryMasterPort)); // queryMaster port
+//
+// return taskParams;
+// }
+// }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/ContainerAllocationEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/ContainerAllocationEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/ContainerAllocationEvent.java
index 6704aa4..ee594a3 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/ContainerAllocationEvent.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/ContainerAllocationEvent.java
@@ -21,11 +21,11 @@ package org.apache.tajo.master.event;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.AbstractEvent;
-import org.apache.tajo.SubQueryId;
+import org.apache.tajo.ExecutionBlockId;
public class ContainerAllocationEvent extends AbstractEvent<ContainerAllocatorEventType> {
- private final SubQueryId subQueryId;
+ private final ExecutionBlockId executionBlockId;
private final Priority priority;
private final Resource resource;
private final boolean isLeafQuery;
@@ -33,13 +33,13 @@ public class ContainerAllocationEvent extends AbstractEvent<ContainerAllocatorEv
private final float progress;
public ContainerAllocationEvent(ContainerAllocatorEventType eventType,
- SubQueryId subQueryId,
- Priority priority,
- Resource resource,
- int requiredNum,
- boolean isLeafQuery, float progress) {
+ ExecutionBlockId executionBlockId,
+ Priority priority,
+ Resource resource,
+ int requiredNum,
+ boolean isLeafQuery, float progress) {
super(eventType);
- this.subQueryId = subQueryId;
+ this.executionBlockId = executionBlockId;
this.priority = priority;
this.resource = resource;
this.requiredNum = requiredNum;
@@ -47,8 +47,8 @@ public class ContainerAllocationEvent extends AbstractEvent<ContainerAllocatorEv
this.progress = progress;
}
- public SubQueryId getSubQueryId() {
- return subQueryId;
+ public ExecutionBlockId getExecutionBlockId() {
+ return executionBlockId;
}
public Priority getPriority() {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/GrouppedContainerAllocatorEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/GrouppedContainerAllocatorEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/GrouppedContainerAllocatorEvent.java
index 44abf30..c34b174 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/GrouppedContainerAllocatorEvent.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/GrouppedContainerAllocatorEvent.java
@@ -20,7 +20,7 @@ package org.apache.tajo.master.event;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.tajo.SubQueryId;
+import org.apache.tajo.ExecutionBlockId;
import java.util.Map;
@@ -29,12 +29,12 @@ public class GrouppedContainerAllocatorEvent
private final Map<String, Integer> requestMap;
public GrouppedContainerAllocatorEvent(ContainerAllocatorEventType eventType,
- SubQueryId subQueryId,
+ ExecutionBlockId executionBlockId,
Priority priority,
Resource resource,
Map<String, Integer> requestMap,
boolean isLeafQuery, float progress) {
- super(eventType, subQueryId, priority,
+ super(eventType, executionBlockId, priority,
resource, requestMap.size(), isLeafQuery, progress);
this.requestMap = requestMap;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryStartEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryStartEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryStartEvent.java
new file mode 100644
index 0000000..6ae8ff7
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryStartEvent.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.event;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.tajo.QueryId;
+
+public class QueryStartEvent extends AbstractEvent {
+ public enum EventType {
+ QUERY_START
+ }
+
+ private QueryId queryId;
+ private String logicalPlanJson;
+
+ public QueryStartEvent(QueryId queryId, String logicalPlanJson) {
+ super(EventType.QUERY_START);
+ this.queryId = queryId;
+ this.logicalPlanJson = logicalPlanJson;
+ }
+
+ public QueryId getQueryId() {
+ return queryId;
+ }
+
+ public String getLogicalPlanJson() {
+ return logicalPlanJson;
+ }
+
+ @Override
+ public String toString() {
+ return getClass().getName() + "," + getType() + "," + queryId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QuerySubQueryEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QuerySubQueryEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QuerySubQueryEvent.java
index 10b67fe..ae36a69 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QuerySubQueryEvent.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QuerySubQueryEvent.java
@@ -18,18 +18,18 @@
package org.apache.tajo.master.event;
-import org.apache.tajo.SubQueryId;
+import org.apache.tajo.ExecutionBlockId;
public class QuerySubQueryEvent extends QueryEvent {
- private SubQueryId subQueryId;
+ private ExecutionBlockId executionBlockId;
- public QuerySubQueryEvent(final SubQueryId id,
+ public QuerySubQueryEvent(final ExecutionBlockId id,
final QueryEventType queryEvent) {
super(id.getQueryId(), queryEvent);
- this.subQueryId = id;
+ this.executionBlockId = id;
}
- public SubQueryId getSubQueryId() {
- return this.subQueryId;
+ public ExecutionBlockId getExecutionBlockId() {
+ return this.executionBlockId;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryCompletedEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryCompletedEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryCompletedEvent.java
index 26c7231..7e07525 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryCompletedEvent.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryCompletedEvent.java
@@ -18,22 +18,22 @@
package org.apache.tajo.master.event;
-import org.apache.tajo.SubQueryId;
+import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.master.querymaster.SubQueryState;
public class SubQueryCompletedEvent extends QueryEvent {
- private final SubQueryId subQueryId;
+ private final ExecutionBlockId executionBlockId;
private final SubQueryState finalState;
- public SubQueryCompletedEvent(final SubQueryId subQueryId,
+ public SubQueryCompletedEvent(final ExecutionBlockId executionBlockId,
SubQueryState finalState) {
- super(subQueryId.getQueryId(), QueryEventType.SUBQUERY_COMPLETED);
- this.subQueryId = subQueryId;
+ super(executionBlockId.getQueryId(), QueryEventType.SUBQUERY_COMPLETED);
+ this.executionBlockId = executionBlockId;
this.finalState = finalState;
}
- public SubQueryId getSubQueryId() {
- return subQueryId;
+ public ExecutionBlockId getExecutionBlockId() {
+ return executionBlockId;
}
public SubQueryState getFinalState() {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryContainerAllocationEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryContainerAllocationEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryContainerAllocationEvent.java
index 5c0ef9a..a8f4800 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryContainerAllocationEvent.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryContainerAllocationEvent.java
@@ -19,14 +19,14 @@
package org.apache.tajo.master.event;
import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.tajo.SubQueryId;
+import org.apache.tajo.ExecutionBlockId;
import java.util.List;
public class SubQueryContainerAllocationEvent extends SubQueryEvent {
private List<Container> allocatedContainer;
- public SubQueryContainerAllocationEvent(final SubQueryId id,
+ public SubQueryContainerAllocationEvent(final ExecutionBlockId id,
List<Container> allocatedContainer) {
super(id, SubQueryEventType.SQ_CONTAINER_ALLOCATED);
this.allocatedContainer = allocatedContainer;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryEvent.java
index 11470ed..2b3d598 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryEvent.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryEvent.java
@@ -19,17 +19,17 @@
package org.apache.tajo.master.event;
import org.apache.hadoop.yarn.event.AbstractEvent;
-import org.apache.tajo.SubQueryId;
+import org.apache.tajo.ExecutionBlockId;
public class SubQueryEvent extends AbstractEvent<SubQueryEventType> {
- private final SubQueryId id;
+ private final ExecutionBlockId id;
- public SubQueryEvent(SubQueryId id, SubQueryEventType subQueryEventType) {
+ public SubQueryEvent(ExecutionBlockId id, SubQueryEventType subQueryEventType) {
super(subQueryEventType);
this.id = id;
}
- public SubQueryId getSubQueryId() {
+ public ExecutionBlockId getSubQueryId() {
return id;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQuerySucceeEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQuerySucceeEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQuerySucceeEvent.java
index d85d4f2..e02196a 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQuerySucceeEvent.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQuerySucceeEvent.java
@@ -18,14 +18,14 @@
package org.apache.tajo.master.event;
-import org.apache.tajo.SubQueryId;
+import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.master.querymaster.SubQueryState;
public class SubQuerySucceeEvent extends SubQueryCompletedEvent {
private final TableMeta tableMeta;
- public SubQuerySucceeEvent(final SubQueryId id, TableMeta tableMeta) {
+ public SubQuerySucceeEvent(final ExecutionBlockId id, TableMeta tableMeta) {
super(id, SubQueryState.SUCCEEDED);
this.tableMeta = tableMeta;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryTaskEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryTaskEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryTaskEvent.java
index 0315236..0217f20 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryTaskEvent.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryTaskEvent.java
@@ -27,7 +27,7 @@ public class SubQueryTaskEvent extends SubQueryEvent {
private QueryUnitId taskId;
public SubQueryTaskEvent(QueryUnitId taskId,
SubQueryEventType subQueryEventType) {
- super(taskId.getSubQueryId(), subQueryEventType);
+ super(taskId.getExecutionBlockId(), subQueryEventType);
this.taskId = taskId;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskAttemptStatusUpdateEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskAttemptStatusUpdateEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskAttemptStatusUpdateEvent.java
index bc84011..d980e05 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskAttemptStatusUpdateEvent.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskAttemptStatusUpdateEvent.java
@@ -19,7 +19,7 @@
package org.apache.tajo.master.event;
import org.apache.tajo.QueryUnitAttemptId;
-import org.apache.tajo.ipc.QueryMasterProtocol.TaskStatusProto;
+import org.apache.tajo.ipc.TajoWorkerProtocol.TaskStatusProto;
public class TaskAttemptStatusUpdateEvent extends TaskAttemptEvent {
private final TaskStatusProto status;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskCompletionEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskCompletionEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskCompletionEvent.java
index e3a4b5f..3ee389a 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskCompletionEvent.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskCompletionEvent.java
@@ -19,7 +19,7 @@
package org.apache.tajo.master.event;
import org.apache.tajo.QueryUnitAttemptId;
-import org.apache.tajo.ipc.QueryMasterProtocol.TaskCompletionReport;
+import org.apache.tajo.ipc.TajoWorkerProtocol.TaskCompletionReport;
public class TaskCompletionEvent extends TaskAttemptEvent {
private TaskCompletionReport report;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskFatalErrorEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskFatalErrorEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskFatalErrorEvent.java
index 06fb392..d70de8a 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskFatalErrorEvent.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskFatalErrorEvent.java
@@ -19,7 +19,7 @@
package org.apache.tajo.master.event;
import org.apache.tajo.QueryUnitAttemptId;
-import org.apache.tajo.ipc.QueryMasterProtocol.TaskFatalErrorReport;
+import org.apache.tajo.ipc.TajoWorkerProtocol.TaskFatalErrorReport;
public class TaskFatalErrorEvent extends TaskAttemptEvent {
private TaskFatalErrorReport report;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskRequestEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskRequestEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskRequestEvent.java
index 166e103..9be7cab 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskRequestEvent.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskRequestEvent.java
@@ -21,7 +21,8 @@ package org.apache.tajo.master.event;
import com.google.protobuf.RpcCallback;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.event.AbstractEvent;
-import org.apache.tajo.ipc.QueryMasterProtocol.QueryUnitRequestProto;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.ipc.TajoWorkerProtocol.QueryUnitRequestProto;
import org.apache.tajo.master.event.TaskRequestEvent.TaskRequestEventType;
public class TaskRequestEvent extends AbstractEvent<TaskRequestEventType> {
@@ -31,12 +32,16 @@ public class TaskRequestEvent extends AbstractEvent<TaskRequestEventType> {
}
private final ContainerId workerId;
+ private final ExecutionBlockId executionBlockId;
+
private final RpcCallback<QueryUnitRequestProto> callback;
public TaskRequestEvent(ContainerId workerId,
+ ExecutionBlockId executionBlockId,
RpcCallback<QueryUnitRequestProto> callback) {
super(TaskRequestEventType.TASK_REQ);
this.workerId = workerId;
+ this.executionBlockId = executionBlockId;
this.callback = callback;
}
@@ -44,6 +49,10 @@ public class TaskRequestEvent extends AbstractEvent<TaskRequestEventType> {
return this.workerId;
}
+ public ExecutionBlockId getExecutionBlockId() {
+ return executionBlockId;
+ }
+
public RpcCallback<QueryUnitRequestProto> getCallback() {
return this.callback;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskScheduleEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskScheduleEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskScheduleEvent.java
index 1f87356..f460203 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskScheduleEvent.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskScheduleEvent.java
@@ -34,7 +34,7 @@ public class TaskScheduleEvent extends TaskSchedulerEvent {
final EventType eventType, boolean isLeafQuery,
final List<QueryUnit.DataLocation> dataLocations,
final String[] racks) {
- super(eventType, attemptId.getSubQueryId());
+ super(eventType, attemptId.getQueryUnitId().getExecutionBlockId());
this.attemptId = attemptId;
this.isLeafQuery = isLeafQuery;
this.dataLocations = dataLocations;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskSchedulerEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskSchedulerEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskSchedulerEvent.java
index d73bb87..71d8587 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskSchedulerEvent.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskSchedulerEvent.java
@@ -19,7 +19,7 @@
package org.apache.tajo.master.event;
import org.apache.hadoop.yarn.event.AbstractEvent;
-import org.apache.tajo.SubQueryId;
+import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.master.event.TaskSchedulerEvent.EventType;
public class TaskSchedulerEvent extends AbstractEvent<EventType> {
@@ -28,14 +28,14 @@ public class TaskSchedulerEvent extends AbstractEvent<EventType> {
T_SUBQUERY_COMPLETED
}
- private final SubQueryId subQueryId;
+ private final ExecutionBlockId executionBlockId;
- public TaskSchedulerEvent(EventType eventType, SubQueryId subQueryId) {
+ public TaskSchedulerEvent(EventType eventType, ExecutionBlockId queryBlockId) {
super(eventType);
- this.subQueryId = subQueryId;
+ this.executionBlockId = queryBlockId;
}
- public SubQueryId getSubQueryId() {
- return this.subQueryId;
+ public ExecutionBlockId getExecutionBlockId() {
+ return this.executionBlockId;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
index 3179abf..99b7c62 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
@@ -25,9 +25,9 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.state.*;
+import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.QueryConf;
import org.apache.tajo.QueryId;
-import org.apache.tajo.SubQueryId;
import org.apache.tajo.TajoProtos.QueryState;
import org.apache.tajo.catalog.TableDesc;
import org.apache.tajo.catalog.TableDescImpl;
@@ -35,7 +35,6 @@ import org.apache.tajo.engine.planner.global.MasterPlan;
import org.apache.tajo.master.ExecutionBlock;
import org.apache.tajo.master.ExecutionBlockCursor;
import org.apache.tajo.master.event.*;
-import org.apache.tajo.master.querymaster.QueryMaster.QueryContext;
import org.apache.tajo.storage.StorageManager;
import java.io.IOException;
@@ -50,16 +49,15 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
public class Query implements EventHandler<QueryEvent> {
private static final Log LOG = LogFactory.getLog(Query.class);
-
// Facilities for Query
private final QueryConf conf;
private final Clock clock;
private String queryStr;
- private Map<SubQueryId, SubQuery> subqueries;
+ private Map<ExecutionBlockId, SubQuery> subqueries;
private final EventHandler eventHandler;
private final MasterPlan plan;
private final StorageManager sm;
- private QueryContext context;
+ private QueryMasterTask.QueryContext context;
private ExecutionBlockCursor cursor;
// Query Status
@@ -106,22 +104,21 @@ public class Query implements EventHandler<QueryEvent> {
.installTopology();
- public Query(final QueryContext context, final QueryId id, Clock clock,
+ public Query(final QueryMasterTask.QueryContext context, final QueryId id,
final long appSubmitTime,
final String queryStr,
final EventHandler eventHandler,
- final MasterPlan plan,
- final StorageManager sm) {
+ final MasterPlan plan) {
this.context = context;
this.conf = context.getConf();
this.id = id;
- this.clock = clock;
+ this.clock = context.getClock();
this.appSubmitTime = appSubmitTime;
this.queryStr = queryStr;
subqueries = Maps.newHashMap();
this.eventHandler = eventHandler;
this.plan = plan;
- this.sm = sm;
+ this.sm = context.getStorageManager();
cursor = new ExecutionBlockCursor(plan);
ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
@@ -131,23 +128,19 @@ public class Query implements EventHandler<QueryEvent> {
stateMachine = stateMachineFactory.make(this);
}
- public boolean isCreateTableStmt() {
- return context.isCreateTableQuery();
- }
-
-// protected FileSystem getFileSystem(Configuration conf) throws IOException {
-// return FileSystem.get(conf);
-// }
-
public float getProgress() {
QueryState state = getStateMachine().getCurrentState();
if (state == QueryState.QUERY_SUCCEEDED) {
return 1.0f;
} else {
int idx = 0;
- float [] subProgresses = new float[subqueries.size()];
+ List<SubQuery> tempSubQueries = new ArrayList<SubQuery>();
+ synchronized(subqueries) {
+ tempSubQueries.addAll(subqueries.values());
+ }
+ float [] subProgresses = new float[tempSubQueries.size()];
boolean finished = true;
- for (SubQuery subquery: subqueries.values()) {
+ for (SubQuery subquery: tempSubQueries) {
if (subquery.getState() != SubQueryState.NEW) {
subProgresses[idx] = subquery.getProgress();
if (finished && subquery.getState() != SubQueryState.SUCCEEDED) {
@@ -239,8 +232,8 @@ public class Query implements EventHandler<QueryEvent> {
public QueryId getId() {
return this.id;
}
-
- public SubQuery getSubQuery(SubQueryId id) {
+
+ public SubQuery getSubQuery(ExecutionBlockId id) {
return this.subqueries.get(id);
}
@@ -263,7 +256,7 @@ public class Query implements EventHandler<QueryEvent> {
@Override
public QueryState transition(Query query, QueryEvent queryEvent) {
query.setStartTime();
- query.context.setState(QueryState.QUERY_INIT);
+ //query.context.setState(QueryState.QUERY_INIT);
return QueryState.QUERY_INIT;
}
}
@@ -277,7 +270,8 @@ public class Query implements EventHandler<QueryEvent> {
query.sm);
subQuery.setPriority(query.priority--);
query.addSubQuery(subQuery);
- LOG.info("Schedule unit plan: \n" + subQuery.getBlock().getPlan());
+ LOG.debug("Schedule unit plan: \n" + subQuery.getBlock().getPlan());
+
subQuery.handle(new SubQueryEvent(subQuery.getId(),
SubQueryEventType.SQ_INIT));
}
@@ -301,13 +295,16 @@ public class Query implements EventHandler<QueryEvent> {
query.addSubQuery(nextSubQuery);
nextSubQuery.handle(new SubQueryEvent(nextSubQuery.getId(),
SubQueryEventType.SQ_INIT));
- LOG.info("Scheduling SubQuery's Priority: " + nextSubQuery.getPriority());
- LOG.info("Scheduling SubQuery's Plan: \n" + nextSubQuery.getBlock().getPlan());
+ LOG.info("Scheduling SubQuery:" + nextSubQuery.getId());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Scheduling SubQuery's Priority: " + nextSubQuery.getPriority());
+ LOG.debug("Scheduling SubQuery's Plan: \n" + nextSubQuery.getBlock().getPlan());
+ }
return query.checkQueryForCompleted();
} else { // Finish a query
if (query.checkQueryForCompleted() == QueryState.QUERY_SUCCEEDED) {
- SubQuery subQuery = query.getSubQuery(castEvent.getSubQueryId());
+ SubQuery subQuery = query.getSubQuery(castEvent.getExecutionBlockId());
TableDesc desc = new TableDescImpl(query.conf.getOutputTable(),
subQuery.getTableMeta(), query.context.getOutputPath());
query.setResultDesc(desc);
@@ -319,7 +316,7 @@ public class Query implements EventHandler<QueryEvent> {
query.eventHandler.handle(new QueryFinishEvent(query.getId()));
if (query.context.isCreateTableQuery()) {
- // TOOD move to QueryMasterManager
+ // TOOD move to QueryJobManager
//query.context.getCatalog().addTable(desc);
}
}
@@ -363,7 +360,6 @@ public class Query implements EventHandler<QueryEvent> {
public QueryState finished(QueryState finalState) {
setFinishTime();
- context.setState(finalState);
return finalState;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
new file mode 100644
index 0000000..2e2870f
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
@@ -0,0 +1,285 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.querymaster;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.service.CompositeService;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.TajoProtos;
+import org.apache.tajo.engine.planner.logical.LogicalRootNode;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.master.TajoMaster;
+import org.apache.tajo.master.rm.WorkerResource;
+import org.apache.tajo.master.rm.WorkerResourceManager;
+import org.apache.tajo.rpc.NullCallback;
+import org.apache.tajo.rpc.ProtoAsyncRpcClient;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class QueryInProgress extends CompositeService {
+ private static final Log LOG = LogFactory.getLog(QueryInProgress.class.getName());
+
+ private QueryId queryId;
+
+ private AsyncDispatcher dispatcher;
+
+ private LogicalRootNode plan;
+
+ private AtomicBoolean querySubmitted = new AtomicBoolean(false);
+
+ private AtomicBoolean stopped = new AtomicBoolean(false);
+
+ private QueryInfo queryInfo;
+
+ private final TajoMaster.MasterContext masterContext;
+
+ private ProtoAsyncRpcClient queryMasterRpc;
+
+ private TajoWorkerProtocol.TajoWorkerProtocolService queryMasterRpcClient;
+
+ public QueryInProgress(
+ TajoMaster.MasterContext masterContext,
+ QueryId queryId, String sql, LogicalRootNode plan) {
+ super(QueryInProgress.class.getName());
+ this.masterContext = masterContext;
+ this.queryId = queryId;
+ this.plan = plan;
+
+ queryInfo = new QueryInfo(queryId, sql);
+ queryInfo.setStartTime(System.currentTimeMillis());
+ }
+
+ @Override
+ public void init(Configuration conf) {
+ dispatcher = new AsyncDispatcher();
+ this.addService(dispatcher);
+
+ dispatcher.register(QueryJobEvent.Type.class, new QueryInProgressEventHandler());
+ super.init(conf);
+ }
+
+ @Override
+ public void stop() {
+ synchronized(stopped) {
+ if(stopped.get()) {
+ return;
+ }
+ stopped.set(true);
+ }
+ LOG.info("=========================================================");
+ LOG.info("Stop query:" + queryId);
+
+ masterContext.getResourceManager().stopQueryMaster(queryId);
+
+ boolean queryMasterStopped = false;
+ long startTime = System.currentTimeMillis();
+ while(true) {
+ try {
+ if(masterContext.getResourceManager().isQueryMasterStopped(queryId)) {
+ LOG.info("====> " + queryId + " QueryMaster stopped");
+ queryMasterStopped = true;
+ break;
+ }
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ }
+
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ break;
+ }
+ if(System.currentTimeMillis() - startTime > 60 * 1000) {
+ LOG.warn("Failed to stop QueryMaster:" + queryId);
+ break;
+ }
+ }
+
+ if(queryMasterRpc != null) {
+ //TODO release to connection pool
+ queryMasterRpc.close();
+ }
+ super.stop();
+ }
+
+ @Override
+ public void start() {
+ super.start();
+ }
+
+ public EventHandler getEventHandler() {
+ return dispatcher.getEventHandler();
+ }
+
+ public void startQueryMaster() {
+ try {
+ LOG.info("Initializing QueryInProgress for QueryID=" + queryId);
+ WorkerResourceManager resourceManager = masterContext.getResourceManager();
+ WorkerResource queryMasterResource = resourceManager.allocateQueryMaster(this);
+
+ if(queryMasterResource != null) {
+ queryInfo.setQueryMasterResource(queryMasterResource);
+ }
+ getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_MASTER_START, queryInfo));
+ } catch (Exception e) {
+ catchException(e);
+ }
+ }
+
+ class QueryInProgressEventHandler implements EventHandler<QueryJobEvent> {
+ @Override
+ public void handle(QueryJobEvent queryJobEvent) {
+ if(queryJobEvent.getType() == QueryJobEvent.Type.QUERY_JOB_HEARTBEAT) {
+ heartbeat(queryJobEvent.getQueryInfo());
+ } else if(queryJobEvent.getType() == QueryJobEvent.Type.QUERY_MASTER_START) {
+ masterContext.getResourceManager().startQueryMaster(QueryInProgress.this);
+ } else if(queryJobEvent.getType() == QueryJobEvent.Type.QUERY_JOB_START) {
+ submmitQueryToMaster();
+ } else if(queryJobEvent.getType() == QueryJobEvent.Type.QUERY_JOB_FINISH) {
+ stop();
+ }
+ }
+ }
+
+ public TajoWorkerProtocol.TajoWorkerProtocolService getQueryMasterRpcClient() {
+ return queryMasterRpcClient;
+ }
+
+ private void connectQueryMaster() throws Exception {
+ if(queryInfo.getQueryMasterResource() != null &&
+ queryInfo.getQueryMasterResource().getAllocatedHost() != null) {
+ InetSocketAddress addr = NetUtils.createSocketAddr(
+ queryInfo.getQueryMasterHost() + ":" + queryInfo.getQueryMasterPort());
+ LOG.info("Connect to QueryMaster:" + addr);
+ //TODO Get Connection from pool
+ queryMasterRpc = new ProtoAsyncRpcClient(TajoWorkerProtocol.class, addr);
+ queryMasterRpcClient = queryMasterRpc.getStub();
+ }
+ }
+
+ private synchronized void submmitQueryToMaster() {
+ if(querySubmitted.get()) {
+ return;
+ }
+
+ try {
+ if(queryMasterRpcClient == null) {
+ connectQueryMaster();
+ }
+ if(queryMasterRpcClient == null) {
+ LOG.info("No QueryMaster conneciton info.");
+ //TODO wait
+ return;
+ }
+ LOG.info("====>Call executeQuery to :" +
+ queryInfo.getQueryMasterHost() + ":" + queryInfo.getQueryMasterPort() + "," + queryId);
+ queryMasterRpcClient.executeQuery(
+ null,
+ TajoWorkerProtocol.QueryExecutionRequestProto.newBuilder()
+ .setQueryId(queryId.getProto())
+ .setLogicalPlanJson(PrimitiveProtos.StringProto.newBuilder().setValue(plan.toJson()).build())
+ .build(), NullCallback.get());
+ querySubmitted.set(true);
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+
+ public void catchException(Exception e) {
+ LOG.error(e.getMessage(), e);
+ queryInfo.setQueryState(TajoProtos.QueryState.QUERY_FAILED);
+ queryInfo.setLastMessage(StringUtils.stringifyException(e));
+ }
+
+ public QueryId getQueryId() {
+ return queryId;
+ }
+
+ public QueryInfo getQueryInfo() {
+ return this.queryInfo;
+ }
+
+ private void heartbeat(QueryInfo queryInfo) {
+ LOG.info("Received QueryMaster heartbeat:" + queryInfo);
+ if(queryInfo.getQueryMasterResource() != null) {
+ this.queryInfo.setQueryMasterResource(queryInfo.getQueryMasterResource());
+ }
+ this.queryInfo.setQueryState(queryInfo.getQueryState());
+
+ if(queryInfo.getLastMessage() != null && !queryInfo.getLastMessage().isEmpty()) {
+ this.queryInfo.setLastMessage(queryInfo.getLastMessage());
+ LOG.info(queryId + queryInfo.getLastMessage());
+ }
+ if(this.queryInfo.getQueryState() == TajoProtos.QueryState.QUERY_FAILED) {
+ //TODO needed QueryMaster's detail status(failed before or after launching worker)
+ //queryMasterStopped.set(true);
+ LOG.warn(queryId + " failed, " + queryInfo.getLastMessage());
+ }
+
+ if(!querySubmitted.get()) {
+ getEventHandler().handle(
+ new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_START, this.queryInfo));
+ }
+
+ if(isFinishState(this.queryInfo.getQueryState())) {
+ getEventHandler().handle(
+ new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_FINISH, this.queryInfo));
+ }
+ }
+
+ private boolean isFinishState(TajoProtos.QueryState state) {
+ return state == TajoProtos.QueryState.QUERY_FAILED ||
+ state == TajoProtos.QueryState.QUERY_KILLED ||
+ state == TajoProtos.QueryState.QUERY_SUCCEEDED;
+ }
+
+// private void checkQueryMasterShutdown() {
+// //run background
+// Thread t = new Thread() {
+// public void run() {
+// while(true) {
+// try {
+// if(masterContext.getResourceManager().isQueryMasterStopped(queryId)) {
+// queryMasterStopped.set(true);
+// LOG.info("==========> " + queryId + " QueryMaster stopped");
+// break;
+// }
+// } catch (Exception e) {
+// LOG.error(e.getMessage(), e);
+// }
+// try {
+// Thread.sleep(1000);
+// } catch (InterruptedException e) {
+// break;
+// }
+// }
+// }
+// };
+//
+// t.start();
+// }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java
new file mode 100644
index 0000000..e7ceae7
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.querymaster;
+
+
+import org.apache.tajo.QueryId;
+import org.apache.tajo.TajoProtos;
+import org.apache.tajo.master.rm.WorkerResource;
+
+public class QueryInfo {
+ private QueryId queryId;
+ private String sql;
+ private TajoProtos.QueryState queryState;
+ private float progress;
+ private long startTime;
+ private long finishTime;
+ private String lastMessage;
+ private WorkerResource queryMasterResource;
+
+ public QueryInfo(QueryId queryId) {
+ this(queryId, null);
+ }
+
+ public QueryInfo(QueryId queryId, String sql) {
+ this.queryId = queryId;
+ this.sql = sql;
+ this.queryState = TajoProtos.QueryState.QUERY_MASTER_INIT;
+ }
+
+ public QueryId getQueryId() {
+ return queryId;
+ }
+
+ public String getSql() {
+ return sql;
+ }
+
+ public String getQueryMasterHost() {
+ if(queryMasterResource == null) {
+ return null;
+ }
+ return queryMasterResource.getAllocatedHost();
+ }
+
+ public void setQueryMasterResource(WorkerResource queryMasterResource) {
+ this.queryMasterResource = queryMasterResource;
+ }
+
+ public int getQueryMasterPort() {
+ if(queryMasterResource == null) {
+ return 0;
+ }
+ return queryMasterResource.getPorts()[0];
+ }
+
+ public int getQueryMasterClientPort() {
+ if(queryMasterResource == null) {
+ return 0;
+ }
+ return queryMasterResource.getPorts()[1];
+ }
+
+ public TajoProtos.QueryState getQueryState() {
+ return queryState;
+ }
+
+ public void setQueryState(TajoProtos.QueryState queryState) {
+ this.queryState = queryState;
+ }
+
+ public long getStartTime() {
+ return startTime;
+ }
+
+ public void setStartTime(long startTime) {
+ this.startTime = startTime;
+ }
+
+ public long getFinishTime() {
+ return finishTime;
+ }
+
+ public void setFinishTime(long finishTime) {
+ this.finishTime = finishTime;
+ }
+
+ public String getLastMessage() {
+ return lastMessage;
+ }
+
+ public void setLastMessage(String lastMessage) {
+ this.lastMessage = lastMessage;
+ }
+
+ public WorkerResource getQueryMasterResource() {
+ return queryMasterResource;
+ }
+
+ public float getProgress() {
+ return progress;
+ }
+
+ public void setProgress(float progress) {
+ this.progress = progress;
+ }
+
+ @Override
+ public String toString() {
+ return queryId.toString() + ", queryMaster=" + queryMasterResource;
+ }
+}