You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/06/13 11:34:03 UTC
git commit: TAJO-51: Parallel Container Launch of
TaskRunnerLauncherImpl. (hyunsik)
Updated Branches:
refs/heads/master d3e276e9f -> 9ca8d365d
TAJO-51: Parallel Container Launch of TaskRunnerLauncherImpl. (hyunsik)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/9ca8d365
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/9ca8d365
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/9ca8d365
Branch: refs/heads/master
Commit: 9ca8d365df1dc95a87e6260d4659944ebd147d56
Parents: d3e276e
Author: Hyunsik Choi <hy...@apache.org>
Authored: Thu Jun 13 18:32:55 2013 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Thu Jun 13 18:32:55 2013 +0900
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../src/main/java/tajo/conf/TajoConf.java | 3 +
.../src/main/java/tajo/master/QueryMaster.java | 10 +-
.../src/main/java/tajo/master/SubQuery.java | 17 +-
.../main/java/tajo/master/TaskRunnerEvent.java | 98 --------
.../java/tajo/master/TaskRunnerGroupEvent.java | 47 ++++
.../java/tajo/master/TaskRunnerLauncher.java | 2 +-
.../tajo/master/TaskRunnerLauncherImpl.java | 223 +++++++++++--------
.../java/tajo/master/TaskSchedulerImpl.java | 6 +-
.../master/event/TaskRunnerLaunchEvent.java | 54 -----
.../tajo/master/event/TaskRunnerStopEvent.java | 29 ---
11 files changed, 194 insertions(+), 297 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9ca8d365/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 57ed6dd..8971e60 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -10,6 +10,8 @@ Release 0.2.0 - unreleased
IMPROVEMENTS
+ TAJO-51: Parallel Container Launch of TaskRunnerLauncherImpl. (hyunsik)
+
TAJO-39 Remove the unused package tajo.engine.plan.global and all files
inside the directory. (hsaputra)
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9ca8d365/tajo-common/src/main/java/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/tajo/conf/TajoConf.java b/tajo-common/src/main/java/tajo/conf/TajoConf.java
index 2fc99f5..084a416 100644
--- a/tajo-common/src/main/java/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/tajo/conf/TajoConf.java
@@ -82,6 +82,9 @@ public class TajoConf extends YarnConfiguration {
AM_QUERY_NODE_BLACKLISTING_ENABLE("tajo.query.node-blacklisting.enable", true),
MAX_TASK_FAILURES_PER_TRACKER("tajo.query.maxtaskfailures.per.worker", 3),
AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERECENT("tajo.query.node-blacklisting.ignore-threshold-node-percent", 33),
+ /** how many launching TaskRunners in parallel */
+ AM_TASKRUNNER_LAUNCH_PARALLEL_NUM("tajo.master.taskrunnerlauncher.parallel.num", 16),
+
//////////////////////////////////
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9ca8d365/tajo-core/tajo-core-backend/src/main/java/tajo/master/QueryMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/QueryMaster.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/QueryMaster.java
index 3f3db05..7317692 100644
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/QueryMaster.java
+++ b/tajo-core/tajo-core-backend/src/main/java/tajo/master/QueryMaster.java
@@ -42,7 +42,7 @@ import tajo.catalog.CatalogService;
import tajo.conf.TajoConf;
import tajo.engine.planner.global.MasterPlan;
import tajo.master.TajoMaster.MasterContext;
-import tajo.master.TaskRunnerLauncherImpl.Container;
+import tajo.master.TaskRunnerLauncherImpl.ContainerProxy;
import tajo.master.event.*;
import tajo.master.rm.RMContainerAllocator;
import tajo.storage.StorageManager;
@@ -140,7 +140,7 @@ public class QueryMaster extends CompositeService implements EventHandler {
taskRunnerLauncher = new TaskRunnerLauncherImpl(queryContext);
addIfService(taskRunnerLauncher);
- dispatcher.register(TaskRunnerEvent.EventType.class, taskRunnerLauncher);
+ dispatcher.register(TaskRunnerGroupEvent.EventType.class, taskRunnerLauncher);
} catch (Throwable t) {
@@ -224,7 +224,7 @@ public class QueryMaster extends CompositeService implements EventHandler {
public class QueryContext {
private QueryConf conf;
- public Map<ContainerId, Container> containers = new ConcurrentHashMap<ContainerId, Container>();
+ public Map<ContainerId, ContainerProxy> containers = new ConcurrentHashMap<ContainerId, ContainerProxy>();
int minCapability;
int maxCapability;
int numCluster;
@@ -281,7 +281,7 @@ public class QueryMaster extends CompositeService implements EventHandler {
return taskRunnerListener.getBindAddress();
}
- public void addContainer(ContainerId cId, Container container) {
+ public void addContainer(ContainerId cId, ContainerProxy container) {
containers.put(cId, container);
}
@@ -293,7 +293,7 @@ public class QueryMaster extends CompositeService implements EventHandler {
return containers.containsKey(cId);
}
- public Container getContainer(ContainerId cId) {
+ public ContainerProxy getContainer(ContainerId cId) {
return containers.get(cId);
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9ca8d365/tajo-core/tajo-core-backend/src/main/java/tajo/master/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/SubQuery.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/SubQuery.java
index d965982..d353733 100644
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/SubQuery.java
+++ b/tajo-core/tajo-core-backend/src/main/java/tajo/master/SubQuery.java
@@ -48,13 +48,13 @@ import tajo.engine.planner.logical.GroupbyNode;
import tajo.engine.planner.logical.ScanNode;
import tajo.engine.planner.logical.StoreTableNode;
import tajo.master.QueryMaster.QueryContext;
+import tajo.master.TaskRunnerGroupEvent.EventType;
import tajo.master.event.*;
import tajo.storage.Fragment;
import tajo.storage.StorageManager;
import java.io.IOException;
import java.util.*;
-import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
@@ -376,9 +376,8 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
private void releaseContainers() {
// If there are still live TaskRunners, try to kill the containers.
- for (Entry<ContainerId, Container> entry : containers.entrySet()) {
- eventHandler.handle(new TaskRunnerStopEvent(getId(), entry.getValue()));
- }
+ eventHandler.handle(new TaskRunnerGroupEvent(EventType.CONTAINER_REMOTE_CLEANUP ,getId(),
+ containers.values()));
}
private void finish() {
@@ -691,13 +690,11 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
subQuery.containers.put(cId, container);
// TODO - This is debugging message. Should be removed
subQuery.i++;
- LOG.info("SubQuery (" + subQuery.getId() + ") has " + subQuery.i + " containers!");
- subQuery.eventHandler.handle(
- new TaskRunnerLaunchEvent(
- subQuery.getId(),
- container,
- container.getResource()));
}
+ LOG.info("SubQuery (" + subQuery.getId() + ") has " + subQuery.i + " containers!");
+ subQuery.eventHandler.handle(
+ new TaskRunnerGroupEvent(EventType.CONTAINER_REMOTE_LAUNCH,
+ subQuery.getId(), allocationEvent.getAllocatedContainer()));
subQuery.eventHandler.handle(new SubQueryEvent(subQuery.getId(),
SubQueryEventType.SQ_START));
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9ca8d365/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskRunnerEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskRunnerEvent.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskRunnerEvent.java
deleted file mode 100644
index 4cadebb..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskRunnerEvent.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package tajo.master;
-
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerToken;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.event.AbstractEvent;
-import tajo.SubQueryId;
-import tajo.master.TaskRunnerEvent.EventType;
-
-public abstract class TaskRunnerEvent extends AbstractEvent<EventType> {
- public enum EventType {
- CONTAINER_REMOTE_LAUNCH,
- CONTAINER_REMOTE_CLEANUP
- }
-
- protected final SubQueryId subQueryId;
- protected final Container container;
- protected final String containerMgrAddress;
- public TaskRunnerEvent(EventType eventType,
- SubQueryId subQueryId,
- Container container) {
- super(eventType);
- this.subQueryId = subQueryId;
- this.container = container;
- NodeId nodeId = container.getNodeId();
- containerMgrAddress = nodeId.getHost() + ":" + nodeId.getPort();
- }
-
- public ContainerId getContainerId() {
- return container.getId();
- }
-
- public Container getContainer() {
- return container;
- }
-
- public String getContainerMgrAddress() {
- return containerMgrAddress;
- }
-
- public ContainerToken getContainerToken() {
- return container.getContainerToken();
- }
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result
- + container.getId().hashCode();
- result = prime * result
- + containerMgrAddress.hashCode();
- result = prime * result
- + container.getContainerToken().hashCode();
- result = prime * result
- + ((subQueryId == null) ? 0 : subQueryId.hashCode());
- return result;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
- TaskRunnerEvent other = (TaskRunnerEvent) obj;
- if (!container.getId().equals(other.getContainerId()))
- return false;
- if (!containerMgrAddress.equals(other.containerMgrAddress))
- return false;
- if (!container.getContainerToken().equals(other.getContainerToken()))
- return false;
- if (!subQueryId.equals(other.subQueryId))
- return false;
- return true;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9ca8d365/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskRunnerGroupEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskRunnerGroupEvent.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskRunnerGroupEvent.java
new file mode 100644
index 0000000..d1cdbc2
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskRunnerGroupEvent.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package tajo.master;
+
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import tajo.SubQueryId;
+import tajo.master.TaskRunnerGroupEvent.EventType;
+
+import java.util.Collection;
+
+public class TaskRunnerGroupEvent extends AbstractEvent<EventType> {
+ public enum EventType {
+ CONTAINER_REMOTE_LAUNCH,
+ CONTAINER_REMOTE_CLEANUP
+ }
+
+ protected final SubQueryId subQueryId;
+ protected final Collection<Container> containers;
+ public TaskRunnerGroupEvent(EventType eventType,
+ SubQueryId subQueryId,
+ Collection<Container> containers) {
+ super(eventType);
+ this.subQueryId = subQueryId;
+ this.containers = containers;
+ }
+
+ public Collection<Container> getContainers() {
+ return containers;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9ca8d365/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskRunnerLauncher.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskRunnerLauncher.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskRunnerLauncher.java
index 9cbad6b..6b73d00 100644
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskRunnerLauncher.java
+++ b/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskRunnerLauncher.java
@@ -20,6 +20,6 @@ package tajo.master;
import org.apache.hadoop.yarn.event.EventHandler;
-public interface TaskRunnerLauncher extends EventHandler<TaskRunnerEvent> {
+public interface TaskRunnerLauncher extends EventHandler<TaskRunnerGroupEvent> {
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9ca8d365/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskRunnerLauncherImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskRunnerLauncherImpl.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskRunnerLauncherImpl.java
index c22e3b4..2279d1b 100644
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskRunnerLauncherImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskRunnerLauncherImpl.java
@@ -44,12 +44,12 @@ import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.ProtoUtils;
import org.apache.hadoop.yarn.util.Records;
import tajo.QueryConf;
+import tajo.SubQueryId;
import tajo.conf.TajoConf;
import tajo.master.QueryMaster.QueryContext;
-import tajo.master.TaskRunnerEvent.EventType;
+import tajo.master.TaskRunnerGroupEvent.EventType;
import tajo.master.event.QueryEvent;
import tajo.master.event.QueryEventType;
-import tajo.master.event.TaskRunnerLaunchEvent;
import tajo.pullserver.PullServerAuxService;
import java.io.IOException;
@@ -57,6 +57,8 @@ import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.security.PrivilegedAction;
import java.util.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
public class TaskRunnerLauncherImpl extends AbstractService implements TaskRunnerLauncher {
@@ -80,12 +82,17 @@ public class TaskRunnerLauncherImpl extends AbstractService implements TaskRunne
final public static FsPermission QUERYCONF_FILE_PERMISSION =
FsPermission.createImmutable((short) 0644); // rw-r--r--
+ /** for launching TaskRunners in parallel */
+ private final ExecutorService executorService;
+
public TaskRunnerLauncherImpl(QueryContext context) {
super(TaskRunnerLauncherImpl.class.getName());
this.context = context;
taskListenerHost = context.getTaskListener().getHostName();
taskListenerPort = context.getTaskListener().getPort();
yarnRPC = context.getYarnRPC();
+ executorService = Executors.newFixedThreadPool(
+ context.getConf().getIntVar(TajoConf.ConfVars.AM_TASKRUNNER_LAUNCH_PARALLEL_NUM));
}
public void start() {
@@ -93,35 +100,57 @@ public class TaskRunnerLauncherImpl extends AbstractService implements TaskRunne
}
public void stop() {
+ executorService.shutdown();
super.stop();
}
@Override
- public void handle(TaskRunnerEvent event) {
-
+ public void handle(TaskRunnerGroupEvent event) {
if (event.getType() == EventType.CONTAINER_REMOTE_LAUNCH) {
- TaskRunnerLaunchEvent castEvent = (TaskRunnerLaunchEvent) event;
- try {
- Container container = new Container(castEvent.getContainerId(),
- castEvent.getContainerMgrAddress(), castEvent.getContainerToken());
- container.launch(castEvent);
-
- } catch (Throwable t) {
- t.printStackTrace();
- }
+ launchTaskRunners(event.subQueryId, event.getContainers());
} else if (event.getType() == EventType.CONTAINER_REMOTE_CLEANUP) {
- try {
- if (context.containsContainer(event.getContainerId())) {
- context.getContainer(event.getContainerId()).kill();
- } else {
- LOG.error("No Such Container: " + event.getContainerId());
- }
- } catch (Throwable t) {
- t.printStackTrace();
- }
+ killTaskRunners(event.getContainers());
}
}
+ private void launchTaskRunners(SubQueryId subQueryId, Collection<Container> containers) {
+ for (Container container : containers) {
+ final ContainerProxy proxy = new ContainerProxy(container, subQueryId);
+ executorService.submit(new LaunchRunner(proxy));
+ }
+ }
+
+ private class LaunchRunner implements Runnable {
+ private final ContainerProxy proxy;
+ public LaunchRunner(ContainerProxy proxy) {
+ this.proxy = proxy;
+ }
+ @Override
+ public void run() {
+ proxy.launch();
+ }
+ }
+
+ private void killTaskRunners(Collection<Container> containers) {
+ for (Container container : containers) {
+ final ContainerProxy proxy = context.getContainer(container.getId());
+ executorService.submit(new KillRunner(proxy));
+ }
+ }
+
+ private class KillRunner implements Runnable {
+ private final ContainerProxy proxy;
+ public KillRunner(ContainerProxy proxy) {
+ this.proxy = proxy;
+ }
+
+ @Override
+ public void run() {
+ proxy.kill();
+ }
+ }
+
+
/**
* Lock this on initialClasspath so that there is only one fork in the AM for
* getting the initial class-path. TODO: We already construct
@@ -248,67 +277,6 @@ public class TaskRunnerLauncherImpl extends AbstractService implements TaskRunne
return ctx;
}
-
- public ContainerLaunchContext createContainerLaunchContext(TaskRunnerLaunchEvent event) {
- synchronized (commonContainerSpecLock) {
- if (commonContainerSpec == null) {
- commonContainerSpec = createCommonContainerLaunchContext();
- }
- }
-
- // Setup environment by cloning from common env.
- Map<String, String> env = commonContainerSpec.getEnvironment();
- Map<String, String> myEnv = new HashMap<String, String>(env.size());
- myEnv.putAll(env);
-
- // Duplicate the ByteBuffers for access by multiple containers.
- Map<String, ByteBuffer> myServiceData = new HashMap<String, ByteBuffer>();
- for (Map.Entry<String, ByteBuffer> entry : commonContainerSpec
- .getServiceData().entrySet()) {
- myServiceData.put(entry.getKey(), entry.getValue().duplicate());
- }
-
- ////////////////////////////////////////////////////////////////////////////
- // Set the local resources
- ////////////////////////////////////////////////////////////////////////////
- // Set the necessary command to execute the application master
- Vector<CharSequence> vargs = new Vector<CharSequence>(30);
-
- // Set java executable command
- //LOG.info("Setting up app master command");
- vargs.add("${JAVA_HOME}" + "/bin/java");
- // Set Xmx based on am memory size
- vargs.add("-Xmx2000m");
- // Set Remote Debugging
- //if (!context.getQuery().getSubQuery(event.getSubQueryId()).isLeafQuery()) {
- //vargs.add("-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=5005");
- //}
- // Set class name
- vargs.add("tajo.worker.TaskRunner");
- vargs.add(taskListenerHost); // tasklistener hostname
- vargs.add(String.valueOf(taskListenerPort)); // tasklistener hostname
- vargs.add(event.getSubQueryId().toString()); // subqueryId
- vargs.add(event.getContainerMgrAddress()); // nodeId
- vargs.add(event.getContainerId().toString()); // containerId
-
- vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout");
- vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr");
-
- // Get final commmand
- StringBuilder command = new StringBuilder();
- for (CharSequence str : vargs) {
- command.append(str).append(" ");
- }
-
- LOG.info("Completed setting up taskrunner command " + command.toString());
- List<String> commands = new ArrayList<String>();
- commands.add(command.toString());
-
- return BuilderUtils.newContainerLaunchContext(event.getContainerId(), commonContainerSpec.getUser(),
- event.getCapability(), commonContainerSpec.getLocalResources(), myEnv, commands, myServiceData,
- null, new HashMap<ApplicationAccessType, String>());
- }
-
protected ContainerManager getCMProxy(ContainerId containerID,
final String containerManagerBindAddr,
ContainerToken containerToken)
@@ -369,21 +337,25 @@ public class TaskRunnerLauncherImpl extends AbstractService implements TaskRunne
PREP, FAILED, RUNNING, DONE, KILLED_BEFORE_LAUNCH
}
- public class Container {
+ public class ContainerProxy {
private ContainerState state;
// store enough information to be able to cleanup the container
+ private Container container;
private ContainerId containerID;
final private String containerMgrAddress;
private ContainerToken containerToken;
private String hostName;
private int port = -1;
+ private final SubQueryId subQueryId;
- public Container(ContainerId containerID,
- String containerMgrAddress, ContainerToken containerToken) {
+ public ContainerProxy(Container container, SubQueryId subQueryId) {
this.state = ContainerState.PREP;
- this.containerMgrAddress = containerMgrAddress;
- this.containerID = containerID;
- this.containerToken = containerToken;
+ this.container = container;
+ this.containerID = container.getId();
+ NodeId nodeId = container.getNodeId();
+ this.containerMgrAddress = nodeId.getHost() + ":" + nodeId.getPort();;
+ this.containerToken = container.getContainerToken();
+ this.subQueryId = subQueryId;
}
public synchronized boolean isCompletelyDone() {
@@ -391,12 +363,11 @@ public class TaskRunnerLauncherImpl extends AbstractService implements TaskRunne
}
@SuppressWarnings("unchecked")
- public synchronized void launch(TaskRunnerLaunchEvent event) {
- LOG.info("Launching Container with Id: " + event.getContainerId());
+ public synchronized void launch() {
+ LOG.info("Launching Container with Id: " + containerID);
if(this.state == ContainerState.KILLED_BEFORE_LAUNCH) {
state = ContainerState.DONE;
- LOG.error("Container (" + event.getContainerId()
- + " was killed before it was launched");
+ LOG.error("Container (" + containerID + " was killed before it was launched");
return;
}
@@ -407,8 +378,7 @@ public class TaskRunnerLauncherImpl extends AbstractService implements TaskRunne
containerToken);
// Construct the actual Container
- ContainerLaunchContext containerLaunchContext =
- createContainerLaunchContext(event);
+ ContainerLaunchContext containerLaunchContext = createContainerLaunchContext();
// Now launch the actual container
StartContainerRequest startRequest = Records
@@ -440,7 +410,7 @@ public class TaskRunnerLauncherImpl extends AbstractService implements TaskRunne
context.getEventHandler().handle(new QueryEvent(context.getQueryId(), QueryEventType.INIT_COMPLETED));
this.state = ContainerState.RUNNING;
- this.hostName = event.getContainerMgrAddress().split(":")[0];
+ this.hostName = containerMgrAddress.split(":")[0];
context.addContainer(containerID, this);
} catch (Throwable t) {
String message = "Container launch failed for " + containerID + " : "
@@ -454,7 +424,6 @@ public class TaskRunnerLauncherImpl extends AbstractService implements TaskRunne
}
}
- @SuppressWarnings("unchecked")
public synchronized void kill() {
if(isCompletelyDone()) {
@@ -500,6 +469,66 @@ public class TaskRunnerLauncherImpl extends AbstractService implements TaskRunne
}
}
+ public ContainerLaunchContext createContainerLaunchContext() {
+ synchronized (commonContainerSpecLock) {
+ if (commonContainerSpec == null) {
+ commonContainerSpec = createCommonContainerLaunchContext();
+ }
+ }
+
+ // Setup environment by cloning from common env.
+ Map<String, String> env = commonContainerSpec.getEnvironment();
+ Map<String, String> myEnv = new HashMap<String, String>(env.size());
+ myEnv.putAll(env);
+
+ // Duplicate the ByteBuffers for access by multiple containers.
+ Map<String, ByteBuffer> myServiceData = new HashMap<String, ByteBuffer>();
+ for (Map.Entry<String, ByteBuffer> entry : commonContainerSpec
+ .getServiceData().entrySet()) {
+ myServiceData.put(entry.getKey(), entry.getValue().duplicate());
+ }
+
+ ////////////////////////////////////////////////////////////////////////////
+ // Set the local resources
+ ////////////////////////////////////////////////////////////////////////////
+ // Set the necessary command to execute the application master
+ Vector<CharSequence> vargs = new Vector<CharSequence>(30);
+
+ // Set java executable command
+ //LOG.info("Setting up app master command");
+ vargs.add("${JAVA_HOME}" + "/bin/java");
+ // Set Xmx based on am memory size
+ vargs.add("-Xmx2000m");
+ // Set Remote Debugging
+ //if (!context.getQuery().getSubQuery(event.getSubQueryId()).isLeafQuery()) {
+ //vargs.add("-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=5005");
+ //}
+ // Set class name
+ vargs.add("tajo.worker.TaskRunner");
+ vargs.add(taskListenerHost); // tasklistener hostname
+ vargs.add(String.valueOf(taskListenerPort)); // tasklistener hostname
+ vargs.add(subQueryId.toString()); // subqueryId
+ vargs.add(containerMgrAddress); // nodeId
+ vargs.add(containerID.toString()); // containerId
+
+ vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout");
+ vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr");
+
+ // Get final commmand
+ StringBuilder command = new StringBuilder();
+ for (CharSequence str : vargs) {
+ command.append(str).append(" ");
+ }
+
+ LOG.info("Completed setting up TaskRunner command " + command.toString());
+ List<String> commands = new ArrayList<String>();
+ commands.add(command.toString());
+
+ return BuilderUtils.newContainerLaunchContext(containerID, commonContainerSpec.getUser(),
+ container.getResource(), commonContainerSpec.getLocalResources(), myEnv, commands,
+ myServiceData, null, new HashMap<ApplicationAccessType, String>());
+ }
+
public String getHostName() {
return this.hostName;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9ca8d365/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskSchedulerImpl.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskSchedulerImpl.java
index 816e285..aae88a6 100644
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskSchedulerImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskSchedulerImpl.java
@@ -34,7 +34,7 @@ import tajo.engine.planner.logical.ScanNode;
import tajo.engine.query.QueryUnitRequestImpl;
import tajo.ipc.protocolrecords.QueryUnitRequest;
import tajo.master.QueryMaster.QueryContext;
-import tajo.master.TaskRunnerLauncherImpl.Container;
+import tajo.master.TaskRunnerLauncherImpl.ContainerProxy;
import tajo.master.event.TaskAttemptAssignedEvent;
import tajo.master.event.TaskRequestEvent;
import tajo.master.event.TaskRequestEvent.TaskRequestEventType;
@@ -300,7 +300,7 @@ public class TaskSchedulerImpl extends AbstractService
TaskRequestEvent taskRequest;
while (it.hasNext() && leafTasks.size() > 0) {
taskRequest = it.next();
- Container container = context.getContainer(taskRequest.getContainerId());
+ ContainerProxy container = context.getContainer(taskRequest.getContainerId());
String hostName = container.getHostName();
QueryUnitAttemptId attemptId = null;
@@ -408,7 +408,7 @@ public class TaskSchedulerImpl extends AbstractService
}
}
- Container container = context.getContainer(
+ ContainerProxy container = context.getContainer(
taskRequest.getContainerId());
context.getEventHandler().handle(new TaskAttemptAssignedEvent(attemptId,
taskRequest.getContainerId(), container.getHostName(), container.getPullServerPort()));
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9ca8d365/tajo-core/tajo-core-backend/src/main/java/tajo/master/event/TaskRunnerLaunchEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/event/TaskRunnerLaunchEvent.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/event/TaskRunnerLaunchEvent.java
deleted file mode 100644
index 2406b48..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/event/TaskRunnerLaunchEvent.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package tajo.master.event;
-
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.Resource;
-import tajo.SubQueryId;
-import tajo.master.TaskRunnerEvent;
-
-public class TaskRunnerLaunchEvent extends TaskRunnerEvent {
-
- private final SubQueryId subQueryId;
- private final Resource resource;
-
- public TaskRunnerLaunchEvent(SubQueryId subQueryId,
- Container container,
- Resource resource) {
- super(EventType.CONTAINER_REMOTE_LAUNCH, subQueryId, container);
- this.subQueryId = subQueryId;
- this.resource = resource;
- }
-
- public SubQueryId getSubQueryId() {
- return this.subQueryId;
- }
-
- public Resource getCapability() {
- return resource;
- }
-
- @Override
- public String toString() {
- return super.toString() + " for container " + getContainerId()
- + " taskAttempt " + subQueryId;
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9ca8d365/tajo-core/tajo-core-backend/src/main/java/tajo/master/event/TaskRunnerStopEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/event/TaskRunnerStopEvent.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/event/TaskRunnerStopEvent.java
deleted file mode 100644
index 7e5a579..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/event/TaskRunnerStopEvent.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package tajo.master.event;
-
-import org.apache.hadoop.yarn.api.records.Container;
-import tajo.SubQueryId;
-import tajo.master.TaskRunnerEvent;
-
-public class TaskRunnerStopEvent extends TaskRunnerEvent {
- public TaskRunnerStopEvent(SubQueryId subQueryId, Container container) {
- super(EventType.CONTAINER_REMOTE_CLEANUP, subQueryId, container);
- }
-}