You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/06/13 11:34:03 UTC

git commit: TAJO-51: Parallel Container Launch of TaskRunnerLauncherImpl. (hyunsik)

Updated Branches:
  refs/heads/master d3e276e9f -> 9ca8d365d


TAJO-51: Parallel Container Launch of TaskRunnerLauncherImpl. (hyunsik)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/9ca8d365
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/9ca8d365
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/9ca8d365

Branch: refs/heads/master
Commit: 9ca8d365df1dc95a87e6260d4659944ebd147d56
Parents: d3e276e
Author: Hyunsik Choi <hy...@apache.org>
Authored: Thu Jun 13 18:32:55 2013 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Thu Jun 13 18:32:55 2013 +0900

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../src/main/java/tajo/conf/TajoConf.java       |   3 +
 .../src/main/java/tajo/master/QueryMaster.java  |  10 +-
 .../src/main/java/tajo/master/SubQuery.java     |  17 +-
 .../main/java/tajo/master/TaskRunnerEvent.java  |  98 --------
 .../java/tajo/master/TaskRunnerGroupEvent.java  |  47 ++++
 .../java/tajo/master/TaskRunnerLauncher.java    |   2 +-
 .../tajo/master/TaskRunnerLauncherImpl.java     | 223 +++++++++++--------
 .../java/tajo/master/TaskSchedulerImpl.java     |   6 +-
 .../master/event/TaskRunnerLaunchEvent.java     |  54 -----
 .../tajo/master/event/TaskRunnerStopEvent.java  |  29 ---
 11 files changed, 194 insertions(+), 297 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9ca8d365/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 57ed6dd..8971e60 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -10,6 +10,8 @@ Release 0.2.0 - unreleased
 
   IMPROVEMENTS
 
+    TAJO-51: Parallel Container Launch of TaskRunnerLauncherImpl. (hyunsik)
+    
     TAJO-39 Remove the unused package tajo.engine.plan.global and all files 
     inside the directory. (hsaputra)
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9ca8d365/tajo-common/src/main/java/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/tajo/conf/TajoConf.java b/tajo-common/src/main/java/tajo/conf/TajoConf.java
index 2fc99f5..084a416 100644
--- a/tajo-common/src/main/java/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/tajo/conf/TajoConf.java
@@ -82,6 +82,9 @@ public class TajoConf extends YarnConfiguration {
     AM_QUERY_NODE_BLACKLISTING_ENABLE("tajo.query.node-blacklisting.enable", true),
     MAX_TASK_FAILURES_PER_TRACKER("tajo.query.maxtaskfailures.per.worker", 3),
     AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERECENT("tajo.query.node-blacklisting.ignore-threshold-node-percent", 33),
+    /** how many launching TaskRunners in parallel */
+    AM_TASKRUNNER_LAUNCH_PARALLEL_NUM("tajo.master.taskrunnerlauncher.parallel.num", 16),
+
 
 
     //////////////////////////////////

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9ca8d365/tajo-core/tajo-core-backend/src/main/java/tajo/master/QueryMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/QueryMaster.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/QueryMaster.java
index 3f3db05..7317692 100644
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/QueryMaster.java
+++ b/tajo-core/tajo-core-backend/src/main/java/tajo/master/QueryMaster.java
@@ -42,7 +42,7 @@ import tajo.catalog.CatalogService;
 import tajo.conf.TajoConf;
 import tajo.engine.planner.global.MasterPlan;
 import tajo.master.TajoMaster.MasterContext;
-import tajo.master.TaskRunnerLauncherImpl.Container;
+import tajo.master.TaskRunnerLauncherImpl.ContainerProxy;
 import tajo.master.event.*;
 import tajo.master.rm.RMContainerAllocator;
 import tajo.storage.StorageManager;
@@ -140,7 +140,7 @@ public class QueryMaster extends CompositeService implements EventHandler {
 
       taskRunnerLauncher = new TaskRunnerLauncherImpl(queryContext);
       addIfService(taskRunnerLauncher);
-      dispatcher.register(TaskRunnerEvent.EventType.class, taskRunnerLauncher);
+      dispatcher.register(TaskRunnerGroupEvent.EventType.class, taskRunnerLauncher);
 
 
     } catch (Throwable t) {
@@ -224,7 +224,7 @@ public class QueryMaster extends CompositeService implements EventHandler {
 
   public class QueryContext {
     private QueryConf conf;
-    public Map<ContainerId, Container> containers = new ConcurrentHashMap<ContainerId, Container>();
+    public Map<ContainerId, ContainerProxy> containers = new ConcurrentHashMap<ContainerId, ContainerProxy>();
     int minCapability;
     int maxCapability;
     int numCluster;
@@ -281,7 +281,7 @@ public class QueryMaster extends CompositeService implements EventHandler {
       return taskRunnerListener.getBindAddress();
     }
 
-    public void addContainer(ContainerId cId, Container container) {
+    public void addContainer(ContainerId cId, ContainerProxy container) {
       containers.put(cId, container);
     }
 
@@ -293,7 +293,7 @@ public class QueryMaster extends CompositeService implements EventHandler {
       return containers.containsKey(cId);
     }
 
-    public Container getContainer(ContainerId cId) {
+    public ContainerProxy getContainer(ContainerId cId) {
       return containers.get(cId);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9ca8d365/tajo-core/tajo-core-backend/src/main/java/tajo/master/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/SubQuery.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/SubQuery.java
index d965982..d353733 100644
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/SubQuery.java
+++ b/tajo-core/tajo-core-backend/src/main/java/tajo/master/SubQuery.java
@@ -48,13 +48,13 @@ import tajo.engine.planner.logical.GroupbyNode;
 import tajo.engine.planner.logical.ScanNode;
 import tajo.engine.planner.logical.StoreTableNode;
 import tajo.master.QueryMaster.QueryContext;
+import tajo.master.TaskRunnerGroupEvent.EventType;
 import tajo.master.event.*;
 import tajo.storage.Fragment;
 import tajo.storage.StorageManager;
 
 import java.io.IOException;
 import java.util.*;
-import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
@@ -376,9 +376,8 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
 
   private void releaseContainers() {
     // If there are still live TaskRunners, try to kill the containers.
-    for (Entry<ContainerId, Container> entry : containers.entrySet()) {
-      eventHandler.handle(new TaskRunnerStopEvent(getId(), entry.getValue()));
-    }
+    eventHandler.handle(new TaskRunnerGroupEvent(EventType.CONTAINER_REMOTE_CLEANUP ,getId(),
+        containers.values()));
   }
 
   private void finish() {
@@ -691,13 +690,11 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
         subQuery.containers.put(cId, container);
         // TODO - This is debugging message. Should be removed
         subQuery.i++;
-        LOG.info("SubQuery (" + subQuery.getId() + ") has " + subQuery.i + " containers!");
-        subQuery.eventHandler.handle(
-            new TaskRunnerLaunchEvent(
-                subQuery.getId(),
-                container,
-                container.getResource()));
       }
+      LOG.info("SubQuery (" + subQuery.getId() + ") has " + subQuery.i + " containers!");
+      subQuery.eventHandler.handle(
+          new TaskRunnerGroupEvent(EventType.CONTAINER_REMOTE_LAUNCH,
+              subQuery.getId(), allocationEvent.getAllocatedContainer()));
 
       subQuery.eventHandler.handle(new SubQueryEvent(subQuery.getId(),
           SubQueryEventType.SQ_START));

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9ca8d365/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskRunnerEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskRunnerEvent.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskRunnerEvent.java
deleted file mode 100644
index 4cadebb..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskRunnerEvent.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package tajo.master;
-
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerToken;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.event.AbstractEvent;
-import tajo.SubQueryId;
-import tajo.master.TaskRunnerEvent.EventType;
-
-public abstract class TaskRunnerEvent extends AbstractEvent<EventType> {
-  public enum EventType {
-    CONTAINER_REMOTE_LAUNCH,
-    CONTAINER_REMOTE_CLEANUP
-  }
-
-  protected final SubQueryId subQueryId;
-  protected final Container container;
-  protected final String containerMgrAddress;
-  public TaskRunnerEvent(EventType eventType,
-                         SubQueryId subQueryId,
-                         Container container) {
-    super(eventType);
-    this.subQueryId = subQueryId;
-    this.container = container;
-    NodeId nodeId = container.getNodeId();
-    containerMgrAddress = nodeId.getHost() + ":" + nodeId.getPort();
-  }
-
-  public ContainerId getContainerId() {
-    return container.getId();
-  }
-
-  public Container getContainer() {
-    return container;
-  }
-
-  public String getContainerMgrAddress() {
-    return containerMgrAddress;
-  }
-
-  public ContainerToken getContainerToken() {
-    return container.getContainerToken();
-  }
-
-  @Override
-  public int hashCode() {
-    final int prime = 31;
-    int result = 1;
-    result = prime * result
-        + container.getId().hashCode();
-    result = prime * result
-        + containerMgrAddress.hashCode();
-    result = prime * result
-        + container.getContainerToken().hashCode();
-    result = prime * result
-        + ((subQueryId == null) ? 0 : subQueryId.hashCode());
-    return result;
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (this == obj)
-      return true;
-    if (obj == null)
-      return false;
-    if (getClass() != obj.getClass())
-      return false;
-    TaskRunnerEvent other = (TaskRunnerEvent) obj;
-    if (!container.getId().equals(other.getContainerId()))
-      return false;
-    if (!containerMgrAddress.equals(other.containerMgrAddress))
-      return false;
-    if (!container.getContainerToken().equals(other.getContainerToken()))
-      return false;
-    if (!subQueryId.equals(other.subQueryId))
-      return false;
-    return true;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9ca8d365/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskRunnerGroupEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskRunnerGroupEvent.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskRunnerGroupEvent.java
new file mode 100644
index 0000000..d1cdbc2
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskRunnerGroupEvent.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package tajo.master;
+
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import tajo.SubQueryId;
+import tajo.master.TaskRunnerGroupEvent.EventType;
+
+import java.util.Collection;
+
+public class TaskRunnerGroupEvent extends AbstractEvent<EventType> {
+  public enum EventType {
+    CONTAINER_REMOTE_LAUNCH,
+    CONTAINER_REMOTE_CLEANUP
+  }
+
+  protected final SubQueryId subQueryId;
+  protected final Collection<Container> containers;
+  public TaskRunnerGroupEvent(EventType eventType,
+                              SubQueryId subQueryId,
+                              Collection<Container> containers) {
+    super(eventType);
+    this.subQueryId = subQueryId;
+    this.containers = containers;
+  }
+
+  public Collection<Container> getContainers() {
+    return containers;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9ca8d365/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskRunnerLauncher.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskRunnerLauncher.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskRunnerLauncher.java
index 9cbad6b..6b73d00 100644
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskRunnerLauncher.java
+++ b/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskRunnerLauncher.java
@@ -20,6 +20,6 @@ package tajo.master;
 
 import org.apache.hadoop.yarn.event.EventHandler;
 
-public interface TaskRunnerLauncher extends EventHandler<TaskRunnerEvent> {
+public interface TaskRunnerLauncher extends EventHandler<TaskRunnerGroupEvent> {
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9ca8d365/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskRunnerLauncherImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskRunnerLauncherImpl.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskRunnerLauncherImpl.java
index c22e3b4..2279d1b 100644
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskRunnerLauncherImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskRunnerLauncherImpl.java
@@ -44,12 +44,12 @@ import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.ProtoUtils;
 import org.apache.hadoop.yarn.util.Records;
 import tajo.QueryConf;
+import tajo.SubQueryId;
 import tajo.conf.TajoConf;
 import tajo.master.QueryMaster.QueryContext;
-import tajo.master.TaskRunnerEvent.EventType;
+import tajo.master.TaskRunnerGroupEvent.EventType;
 import tajo.master.event.QueryEvent;
 import tajo.master.event.QueryEventType;
-import tajo.master.event.TaskRunnerLaunchEvent;
 import tajo.pullserver.PullServerAuxService;
 
 import java.io.IOException;
@@ -57,6 +57,8 @@ import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.security.PrivilegedAction;
 import java.util.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 public class TaskRunnerLauncherImpl extends AbstractService implements TaskRunnerLauncher {
@@ -80,12 +82,17 @@ public class TaskRunnerLauncherImpl extends AbstractService implements TaskRunne
   final public static FsPermission QUERYCONF_FILE_PERMISSION =
       FsPermission.createImmutable((short) 0644); // rw-r--r--
 
+  /** for launching TaskRunners in parallel */
+  private final ExecutorService executorService;
+
   public TaskRunnerLauncherImpl(QueryContext context) {
     super(TaskRunnerLauncherImpl.class.getName());
     this.context = context;
     taskListenerHost = context.getTaskListener().getHostName();
     taskListenerPort = context.getTaskListener().getPort();
     yarnRPC = context.getYarnRPC();
+    executorService = Executors.newFixedThreadPool(
+        context.getConf().getIntVar(TajoConf.ConfVars.AM_TASKRUNNER_LAUNCH_PARALLEL_NUM));
   }
 
   public void start() {
@@ -93,35 +100,57 @@ public class TaskRunnerLauncherImpl extends AbstractService implements TaskRunne
   }
 
   public void stop() {
+    executorService.shutdown();
     super.stop();
   }
 
   @Override
-  public void handle(TaskRunnerEvent event) {
-
+  public void handle(TaskRunnerGroupEvent event) {
     if (event.getType() == EventType.CONTAINER_REMOTE_LAUNCH) {
-      TaskRunnerLaunchEvent castEvent = (TaskRunnerLaunchEvent) event;
-      try {
-        Container container = new Container(castEvent.getContainerId(),
-            castEvent.getContainerMgrAddress(), castEvent.getContainerToken());
-        container.launch(castEvent);
-
-      } catch (Throwable t) {
-        t.printStackTrace();
-      }
+     launchTaskRunners(event.subQueryId, event.getContainers());
     } else if (event.getType() == EventType.CONTAINER_REMOTE_CLEANUP) {
-      try {
-        if (context.containsContainer(event.getContainerId())) {
-          context.getContainer(event.getContainerId()).kill();
-        } else {
-          LOG.error("No Such Container: " + event.getContainerId());
-        }
-      } catch (Throwable t) {
-        t.printStackTrace();
-      }
+      killTaskRunners(event.getContainers());
     }
   }
 
+  private void launchTaskRunners(SubQueryId subQueryId, Collection<Container> containers) {
+    for (Container container : containers) {
+      final ContainerProxy proxy = new ContainerProxy(container, subQueryId);
+      executorService.submit(new LaunchRunner(proxy));
+    }
+  }
+
+  private class LaunchRunner implements Runnable {
+    private final ContainerProxy proxy;
+    public LaunchRunner(ContainerProxy proxy) {
+      this.proxy = proxy;
+    }
+    @Override
+    public void run() {
+      proxy.launch();
+    }
+  }
+
+  private void killTaskRunners(Collection<Container> containers) {
+    for (Container container : containers) {
+      final ContainerProxy proxy = context.getContainer(container.getId());
+      executorService.submit(new KillRunner(proxy));
+    }
+  }
+
+  private class KillRunner implements Runnable {
+    private final ContainerProxy proxy;
+    public KillRunner(ContainerProxy proxy) {
+      this.proxy = proxy;
+    }
+
+    @Override
+    public void run() {
+      proxy.kill();
+    }
+  }
+
+
   /**
    * Lock this on initialClasspath so that there is only one fork in the AM for
    * getting the initial class-path. TODO: We already construct
@@ -248,67 +277,6 @@ public class TaskRunnerLauncherImpl extends AbstractService implements TaskRunne
     return ctx;
   }
 
-
-  public ContainerLaunchContext createContainerLaunchContext(TaskRunnerLaunchEvent event) {
-    synchronized (commonContainerSpecLock) {
-      if (commonContainerSpec == null) {
-        commonContainerSpec = createCommonContainerLaunchContext();
-      }
-    }
-
-    // Setup environment by cloning from common env.
-    Map<String, String> env = commonContainerSpec.getEnvironment();
-    Map<String, String> myEnv = new HashMap<String, String>(env.size());
-    myEnv.putAll(env);
-
-    // Duplicate the ByteBuffers for access by multiple containers.
-    Map<String, ByteBuffer> myServiceData = new HashMap<String, ByteBuffer>();
-    for (Map.Entry<String, ByteBuffer> entry : commonContainerSpec
-        .getServiceData().entrySet()) {
-      myServiceData.put(entry.getKey(), entry.getValue().duplicate());
-    }
-
-    ////////////////////////////////////////////////////////////////////////////
-    // Set the local resources
-    ////////////////////////////////////////////////////////////////////////////
-    // Set the necessary command to execute the application master
-    Vector<CharSequence> vargs = new Vector<CharSequence>(30);
-
-    // Set java executable command
-    //LOG.info("Setting up app master command");
-    vargs.add("${JAVA_HOME}" + "/bin/java");
-    // Set Xmx based on am memory size
-    vargs.add("-Xmx2000m");
-    // Set Remote Debugging
-    //if (!context.getQuery().getSubQuery(event.getSubQueryId()).isLeafQuery()) {
-    //vargs.add("-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=5005");
-    //}
-    // Set class name
-    vargs.add("tajo.worker.TaskRunner");
-    vargs.add(taskListenerHost); // tasklistener hostname
-    vargs.add(String.valueOf(taskListenerPort)); // tasklistener hostname
-    vargs.add(event.getSubQueryId().toString()); // subqueryId
-    vargs.add(event.getContainerMgrAddress()); // nodeId
-    vargs.add(event.getContainerId().toString()); // containerId
-
-    vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout");
-    vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr");
-
-    // Get final commmand
-    StringBuilder command = new StringBuilder();
-    for (CharSequence str : vargs) {
-      command.append(str).append(" ");
-    }
-
-    LOG.info("Completed setting up taskrunner command " + command.toString());
-    List<String> commands = new ArrayList<String>();
-    commands.add(command.toString());
-
-    return BuilderUtils.newContainerLaunchContext(event.getContainerId(), commonContainerSpec.getUser(),
-        event.getCapability(), commonContainerSpec.getLocalResources(), myEnv, commands, myServiceData,
-        null, new HashMap<ApplicationAccessType, String>());
-  }
-
   protected ContainerManager getCMProxy(ContainerId containerID,
                                         final String containerManagerBindAddr,
                                         ContainerToken containerToken)
@@ -369,21 +337,25 @@ public class TaskRunnerLauncherImpl extends AbstractService implements TaskRunne
     PREP, FAILED, RUNNING, DONE, KILLED_BEFORE_LAUNCH
   }
 
-  public class Container {
+  public class ContainerProxy {
     private ContainerState state;
     // store enough information to be able to cleanup the container
+    private Container container;
     private ContainerId containerID;
     final private String containerMgrAddress;
     private ContainerToken containerToken;
     private String hostName;
     private int port = -1;
+    private final SubQueryId subQueryId;
 
-    public Container(ContainerId containerID,
-                     String containerMgrAddress, ContainerToken containerToken) {
+    public ContainerProxy(Container container, SubQueryId subQueryId) {
       this.state = ContainerState.PREP;
-      this.containerMgrAddress = containerMgrAddress;
-      this.containerID = containerID;
-      this.containerToken = containerToken;
+      this.container = container;
+      this.containerID = container.getId();
+      NodeId nodeId = container.getNodeId();
+      this.containerMgrAddress = nodeId.getHost() + ":" + nodeId.getPort();;
+      this.containerToken = container.getContainerToken();
+      this.subQueryId = subQueryId;
     }
 
     public synchronized boolean isCompletelyDone() {
@@ -391,12 +363,11 @@ public class TaskRunnerLauncherImpl extends AbstractService implements TaskRunne
     }
 
     @SuppressWarnings("unchecked")
-    public synchronized void launch(TaskRunnerLaunchEvent event) {
-      LOG.info("Launching Container with Id: " + event.getContainerId());
+    public synchronized void launch() {
+      LOG.info("Launching Container with Id: " + containerID);
       if(this.state == ContainerState.KILLED_BEFORE_LAUNCH) {
         state = ContainerState.DONE;
-        LOG.error("Container (" + event.getContainerId()
-            + " was killed before it was launched");
+        LOG.error("Container (" + containerID + " was killed before it was launched");
         return;
       }
 
@@ -407,8 +378,7 @@ public class TaskRunnerLauncherImpl extends AbstractService implements TaskRunne
             containerToken);
 
         // Construct the actual Container
-        ContainerLaunchContext containerLaunchContext =
-            createContainerLaunchContext(event);
+        ContainerLaunchContext containerLaunchContext = createContainerLaunchContext();
 
         // Now launch the actual container
         StartContainerRequest startRequest = Records
@@ -440,7 +410,7 @@ public class TaskRunnerLauncherImpl extends AbstractService implements TaskRunne
         context.getEventHandler().handle(new QueryEvent(context.getQueryId(), QueryEventType.INIT_COMPLETED));
 
         this.state = ContainerState.RUNNING;
-        this.hostName = event.getContainerMgrAddress().split(":")[0];
+        this.hostName = containerMgrAddress.split(":")[0];
         context.addContainer(containerID, this);
       } catch (Throwable t) {
         String message = "Container launch failed for " + containerID + " : "
@@ -454,7 +424,6 @@ public class TaskRunnerLauncherImpl extends AbstractService implements TaskRunne
       }
     }
 
-    @SuppressWarnings("unchecked")
     public synchronized void kill() {
 
       if(isCompletelyDone()) {
@@ -500,6 +469,66 @@ public class TaskRunnerLauncherImpl extends AbstractService implements TaskRunne
       }
     }
 
+    public ContainerLaunchContext createContainerLaunchContext() {
+      synchronized (commonContainerSpecLock) {
+        if (commonContainerSpec == null) {
+          commonContainerSpec = createCommonContainerLaunchContext();
+        }
+      }
+
+      // Setup environment by cloning from common env.
+      Map<String, String> env = commonContainerSpec.getEnvironment();
+      Map<String, String> myEnv = new HashMap<String, String>(env.size());
+      myEnv.putAll(env);
+
+      // Duplicate the ByteBuffers for access by multiple containers.
+      Map<String, ByteBuffer> myServiceData = new HashMap<String, ByteBuffer>();
+      for (Map.Entry<String, ByteBuffer> entry : commonContainerSpec
+          .getServiceData().entrySet()) {
+        myServiceData.put(entry.getKey(), entry.getValue().duplicate());
+      }
+
+      ////////////////////////////////////////////////////////////////////////////
+      // Set the local resources
+      ////////////////////////////////////////////////////////////////////////////
+      // Set the necessary command to execute the application master
+      Vector<CharSequence> vargs = new Vector<CharSequence>(30);
+
+      // Set java executable command
+      //LOG.info("Setting up app master command");
+      vargs.add("${JAVA_HOME}" + "/bin/java");
+      // Set Xmx based on am memory size
+      vargs.add("-Xmx2000m");
+      // Set Remote Debugging
+      //if (!context.getQuery().getSubQuery(event.getSubQueryId()).isLeafQuery()) {
+      //vargs.add("-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=5005");
+      //}
+      // Set class name
+      vargs.add("tajo.worker.TaskRunner");
+      vargs.add(taskListenerHost); // tasklistener hostname
+      vargs.add(String.valueOf(taskListenerPort)); // tasklistener hostname
+      vargs.add(subQueryId.toString()); // subqueryId
+      vargs.add(containerMgrAddress); // nodeId
+      vargs.add(containerID.toString()); // containerId
+
+      vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout");
+      vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr");
+
+      // Get final commmand
+      StringBuilder command = new StringBuilder();
+      for (CharSequence str : vargs) {
+        command.append(str).append(" ");
+      }
+
+      LOG.info("Completed setting up TaskRunner command " + command.toString());
+      List<String> commands = new ArrayList<String>();
+      commands.add(command.toString());
+
+      return BuilderUtils.newContainerLaunchContext(containerID, commonContainerSpec.getUser(),
+          container.getResource(), commonContainerSpec.getLocalResources(), myEnv, commands,
+          myServiceData, null, new HashMap<ApplicationAccessType, String>());
+    }
+
     public String getHostName() {
       return this.hostName;
     }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9ca8d365/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskSchedulerImpl.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskSchedulerImpl.java
index 816e285..aae88a6 100644
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskSchedulerImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/tajo/master/TaskSchedulerImpl.java
@@ -34,7 +34,7 @@ import tajo.engine.planner.logical.ScanNode;
 import tajo.engine.query.QueryUnitRequestImpl;
 import tajo.ipc.protocolrecords.QueryUnitRequest;
 import tajo.master.QueryMaster.QueryContext;
-import tajo.master.TaskRunnerLauncherImpl.Container;
+import tajo.master.TaskRunnerLauncherImpl.ContainerProxy;
 import tajo.master.event.TaskAttemptAssignedEvent;
 import tajo.master.event.TaskRequestEvent;
 import tajo.master.event.TaskRequestEvent.TaskRequestEventType;
@@ -300,7 +300,7 @@ public class TaskSchedulerImpl extends AbstractService
       TaskRequestEvent taskRequest;
       while (it.hasNext() && leafTasks.size() > 0) {
         taskRequest = it.next();
-        Container container = context.getContainer(taskRequest.getContainerId());
+        ContainerProxy container = context.getContainer(taskRequest.getContainerId());
         String hostName = container.getHostName();
 
         QueryUnitAttemptId attemptId = null;
@@ -408,7 +408,7 @@ public class TaskSchedulerImpl extends AbstractService
             }
           }
 
-          Container container = context.getContainer(
+          ContainerProxy container = context.getContainer(
               taskRequest.getContainerId());
           context.getEventHandler().handle(new TaskAttemptAssignedEvent(attemptId,
               taskRequest.getContainerId(), container.getHostName(), container.getPullServerPort()));

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9ca8d365/tajo-core/tajo-core-backend/src/main/java/tajo/master/event/TaskRunnerLaunchEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/event/TaskRunnerLaunchEvent.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/event/TaskRunnerLaunchEvent.java
deleted file mode 100644
index 2406b48..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/event/TaskRunnerLaunchEvent.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package tajo.master.event;
-
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.Resource;
-import tajo.SubQueryId;
-import tajo.master.TaskRunnerEvent;
-
-public class TaskRunnerLaunchEvent extends TaskRunnerEvent {
-
-  private final SubQueryId subQueryId;
-  private final Resource resource;
-
-  public TaskRunnerLaunchEvent(SubQueryId subQueryId,
-                               Container container,
-                               Resource resource) {
-    super(EventType.CONTAINER_REMOTE_LAUNCH, subQueryId, container);
-    this.subQueryId = subQueryId;
-    this.resource = resource;
-  }
-
-  public SubQueryId getSubQueryId() {
-    return this.subQueryId;
-  }
-
-  public Resource getCapability() {
-    return resource;
-  }
-
-  @Override
-  public String toString() {
-    return super.toString() + " for container " + getContainerId()
-        + " taskAttempt " + subQueryId;
-  }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/9ca8d365/tajo-core/tajo-core-backend/src/main/java/tajo/master/event/TaskRunnerStopEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/event/TaskRunnerStopEvent.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/event/TaskRunnerStopEvent.java
deleted file mode 100644
index 7e5a579..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/event/TaskRunnerStopEvent.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package tajo.master.event;
-
-import org.apache.hadoop.yarn.api.records.Container;
-import tajo.SubQueryId;
-import tajo.master.TaskRunnerEvent;
-
-public class TaskRunnerStopEvent extends TaskRunnerEvent {
-  public TaskRunnerStopEvent(SubQueryId subQueryId, Container container) {
-    super(EventType.CONTAINER_REMOTE_CLEANUP, subQueryId, container);
-  }
-}