You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/04/22 10:10:33 UTC

git commit: TAJO-783: Remove yarn-related code from tajo-core. (hyunsik)

Repository: tajo
Updated Branches:
  refs/heads/master 9cd8dac35 -> 882f92c6d


TAJO-783: Remove yarn-related code from tajo-core. (hyunsik)


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/882f92c6
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/882f92c6
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/882f92c6

Branch: refs/heads/master
Commit: 882f92c6d74818eee7f086a7c5f9467b44101abf
Parents: 9cd8dac
Author: Hyunsik Choi <hy...@apache.org>
Authored: Tue Apr 22 17:04:39 2014 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Tue Apr 22 17:10:15 2014 +0900

----------------------------------------------------------------------
 CHANGES.txt                                     |   5 +
 tajo-core/pom.xml                               |  26 --
 .../tajo/master/DefaultTaskScheduler.java       |   4 +-
 .../apache/tajo/master/LazyTaskScheduler.java   |   8 +-
 .../apache/tajo/master/YarnContainerProxy.java  | 414 -------------------
 .../tajo/master/YarnTaskRunnerLauncherImpl.java | 200 ---------
 .../master/querymaster/QueryMasterTask.java     |   9 +-
 .../master/rm/YarnRMContainerAllocator.java     | 237 -----------
 .../tajo/master/rm/YarnTajoResourceManager.java | 349 ----------------
 .../tajo/worker/YarnResourceAllocator.java      | 117 ------
 .../org/apache/tajo/TajoTestingCluster.java     |   8 +-
 tajo-maven-plugins/pom.xml                      |   1 +
 tajo-project/pom.xml                            |   6 +
 13 files changed, 25 insertions(+), 1359 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/882f92c6/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a193a1e..de78d31 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,11 @@ Release 0.9.0 - unreleased
 
     TAJO-769: A minor improvements for HCatalogStore (Fengdong Yu via hyunsik)
 
+
+  SUB TASKS
+
+    TAJO-783: Remove yarn-related code from tajo-core. (hyunsik)
+
 Release 0.8.0 - unreleased
 
   NEW FEATURES

http://git-wip-us.apache.org/repos/asf/tajo/blob/882f92c6/tajo-core/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-core/pom.xml b/tajo-core/pom.xml
index 2f38e92..f90e089 100644
--- a/tajo-core/pom.xml
+++ b/tajo-core/pom.xml
@@ -303,35 +303,9 @@
     </dependency>
     <dependency>
       <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-yarn-client</artifactId>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-yarn-server-nodemanager</artifactId>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-yarn-server-resourcemanager</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-yarn-server-common</artifactId>
       <scope>provided</scope>
     </dependency>
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-mapreduce-client-core</artifactId>
-      <scope>test</scope>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-yarn-server-tests</artifactId>
-      <type>test-jar</type>
-      <scope>test</scope>
-    </dependency>
 
     <dependency>
       <groupId>com.google.protobuf</groupId>

http://git-wip-us.apache.org/repos/asf/tajo/blob/882f92c6/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
index 409a1b1..9978670 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
@@ -269,12 +269,12 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
       }
       int qSize = taskRequestQueue.size();
       if (qSize != 0 && qSize % 1000 == 0) {
-        LOG.info("Size of event-queue in YarnRMContainerAllocator is " + qSize);
+        LOG.info("Size of event-queue in DefaultTaskScheduler is " + qSize);
       }
       int remCapacity = taskRequestQueue.remainingCapacity();
       if (remCapacity < 1000) {
         LOG.warn("Very low remaining capacity in the event-queue "
-            + "of YarnRMContainerAllocator: " + remCapacity);
+            + "of DefaultTaskScheduler: " + remCapacity);
       }
 
       taskRequestQueue.add(event);

http://git-wip-us.apache.org/repos/asf/tajo/blob/882f92c6/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java
index 434ea22..dd82f28 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java
@@ -186,12 +186,12 @@ public class LazyTaskScheduler extends AbstractTaskScheduler {
   public void handle(TaskSchedulerEvent event) {
     int qSize = eventQueue.size();
     if (qSize != 0 && qSize % 1000 == 0) {
-      LOG.info("Size of event-queue in YarnRMContainerAllocator is " + qSize);
+      LOG.info("Size of event-queue in DefaultTaskScheduler is " + qSize);
     }
     int remCapacity = eventQueue.remainingCapacity();
     if (remCapacity < 1000) {
       LOG.warn("Very low remaining capacity in the event-queue "
-          + "of YarnRMContainerAllocator: " + remCapacity);
+          + "of DefaultTaskScheduler: " + remCapacity);
     }
 
     if (event.getType() == EventType.T_SCHEDULE) {
@@ -305,12 +305,12 @@ public class LazyTaskScheduler extends AbstractTaskScheduler {
       }
       int qSize = taskRequestQueue.size();
       if (qSize != 0 && qSize % 1000 == 0) {
-        LOG.info("Size of event-queue in YarnRMContainerAllocator is " + qSize);
+        LOG.info("Size of event-queue in DefaultTaskScheduler is " + qSize);
       }
       int remCapacity = taskRequestQueue.remainingCapacity();
       if (remCapacity < 1000) {
         LOG.warn("Very low remaining capacity in the event-queue "
-            + "of YarnRMContainerAllocator: " + remCapacity);
+            + "of DefaultTaskScheduler: " + remCapacity);
       }
 
       taskRequestQueue.add(event);

http://git-wip-us.apache.org/repos/asf/tajo/blob/882f92c6/tajo-core/src/main/java/org/apache/tajo/master/YarnContainerProxy.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/YarnContainerProxy.java b/tajo-core/src/main/java/org/apache/tajo/master/YarnContainerProxy.java
deleted file mode 100644
index 4f178fb..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/YarnContainerProxy.java
+++ /dev/null
@@ -1,414 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
-import org.apache.hadoop.yarn.api.records.*;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
-import org.apache.hadoop.yarn.server.utils.BuilderUtils;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.hadoop.yarn.util.Records;
-import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.TajoConstants;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.master.querymaster.QueryMasterTask;
-import org.apache.tajo.pullserver.PullServerAuxService;
-import org.apache.tajo.worker.TajoWorker;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.security.PrivilegedAction;
-import java.util.*;
-
-public class YarnContainerProxy extends ContainerProxy {
-  private final static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
-
-  protected final YarnRPC yarnRPC;
-  final protected String containerMgrAddress;
-  protected Token containerToken;
-
-  public YarnContainerProxy(QueryMasterTask.QueryMasterTaskContext context, Configuration conf, YarnRPC yarnRPC,
-                                  Container container, ExecutionBlockId executionBlockId) {
-    super(context, conf, executionBlockId, container);
-    this.yarnRPC = yarnRPC;
-
-    NodeId nodeId = container.getNodeId();
-    this.containerMgrAddress = nodeId.getHost() + ":" + nodeId.getPort();
-    this.containerToken = container.getContainerToken();
-  }
-
-  protected ContainerManagementProtocol getCMProxy(ContainerId containerID,
-                                                   final String containerManagerBindAddr,
-                                                   Token containerToken)
-      throws IOException {
-    String [] hosts = containerManagerBindAddr.split(":");
-    final InetSocketAddress cmAddr =
-        new InetSocketAddress(hosts[0], Integer.parseInt(hosts[1]));
-    UserGroupInformation user = UserGroupInformation.getCurrentUser();
-
-    if (UserGroupInformation.isSecurityEnabled()) {
-      org.apache.hadoop.security.token.Token<ContainerTokenIdentifier> token =
-          ConverterUtils.convertFromYarn(containerToken, cmAddr);
-      // the user in createRemoteUser in this context has to be ContainerID
-      user = UserGroupInformation.createRemoteUser(containerID.toString());
-      user.addToken(token);
-    }
-
-    ContainerManagementProtocol proxy = user.doAs(new PrivilegedAction<ContainerManagementProtocol>() {
-      @Override
-      public ContainerManagementProtocol run() {
-        return (ContainerManagementProtocol) yarnRPC.getProxy(ContainerManagementProtocol.class,
-            cmAddr, conf);
-      }
-    });
-
-    return proxy;
-  }
-
-  @Override
-  @SuppressWarnings("unchecked")
-  public synchronized void launch(ContainerLaunchContext commonContainerLaunchContext) {
-    LOG.info("Launching Container with Id: " + containerID);
-    if(this.state == ContainerState.KILLED_BEFORE_LAUNCH) {
-      state = ContainerState.DONE;
-      LOG.error("Container (" + containerID + " was killed before it was launched");
-      return;
-    }
-
-    ContainerManagementProtocol proxy = null;
-    try {
-
-      proxy = getCMProxy(containerID, containerMgrAddress,
-          containerToken);
-
-      // Construct the actual Container
-      ContainerLaunchContext containerLaunchContext = createContainerLaunchContext(commonContainerLaunchContext);
-
-      // Now launch the actual container
-      List<StartContainerRequest> startRequestList = new ArrayList<StartContainerRequest>();
-      StartContainerRequest startRequest = Records
-          .newRecord(StartContainerRequest.class);
-      startRequest.setContainerLaunchContext(containerLaunchContext);
-      startRequestList.add(startRequest);
-      StartContainersRequest startRequests = Records.newRecord(StartContainersRequest.class);
-      startRequests.setStartContainerRequests(startRequestList);
-      StartContainersResponse response = proxy.startContainers(startRequests);
-
-      ByteBuffer portInfo = response.getAllServicesMetaData().get(PullServerAuxService.PULLSERVER_SERVICEID);
-
-      if(portInfo != null) {
-        port = PullServerAuxService.deserializeMetaData(portInfo);
-      }
-
-      LOG.info("PullServer port returned by ContainerManager for "
-          + containerID + " : " + port);
-
-      if(port < 0) {
-        this.state = ContainerState.FAILED;
-        throw new IllegalStateException("Invalid shuffle port number "
-            + port + " returned for " + containerID);
-      }
-
-      this.state = ContainerState.RUNNING;
-      this.hostName = containerMgrAddress.split(":")[0];
-      context.getResourceAllocator().addContainer(containerID, this);
-    } catch (Throwable t) {
-      String message = "Container launch failed for " + containerID + " : "
-          + StringUtils.stringifyException(t);
-      this.state = ContainerState.FAILED;
-      LOG.error(message);
-    } finally {
-      if (proxy != null) {
-        yarnRPC.stopProxy(proxy, conf);
-      }
-    }
-  }
-
-
-  public ContainerLaunchContext createContainerLaunchContext(ContainerLaunchContext commonContainerLaunchContext) {
-    // Setup environment by cloning from common env.
-    Map<String, String> env = commonContainerLaunchContext.getEnvironment();
-    Map<String, String> myEnv = new HashMap<String, String>(env.size());
-    myEnv.putAll(env);
-
-    // Duplicate the ByteBuffers for access by multiple containers.
-    Map<String, ByteBuffer> myServiceData = new HashMap<String, ByteBuffer>();
-    for (Map.Entry<String, ByteBuffer> entry : commonContainerLaunchContext.getServiceData().entrySet()) {
-      myServiceData.put(entry.getKey(), entry.getValue().duplicate());
-    }
-
-    ////////////////////////////////////////////////////////////////////////////
-    // Set the local resources
-    ////////////////////////////////////////////////////////////////////////////
-    // Set the necessary command to execute the application master
-    Vector<CharSequence> vargs = new Vector<CharSequence>(30);
-
-    // Set java executable command
-    //LOG.info("Setting up app master command");
-    vargs.add("${JAVA_HOME}" + "/bin/java");
-    // Set Xmx based on am memory size
-    vargs.add("-Xmx2000m");
-    // Set Remote Debugging
-    //if (!context.getQuery().getSubQuery(event.getExecutionBlockId()).isLeafQuery()) {
-    //vargs.add("-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=5005");
-    //}
-    // Set class name
-    //vargs.add(getRunnerClass());
-    vargs.add(TajoWorker.class.getCanonicalName());
-    vargs.add("tr");     //workerMode
-    vargs.add(getId()); // subqueryId
-    vargs.add(containerMgrAddress); // nodeId
-    vargs.add(containerID.toString()); // containerId
-    Vector<CharSequence> taskParams = getTaskParams();
-    if(taskParams != null) {
-      vargs.addAll(taskParams);
-    }
-
-    vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout");
-    vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr");
-
-    // Get final commmand
-    StringBuilder command = new StringBuilder();
-    for (CharSequence str : vargs) {
-      command.append(str).append(" ");
-    }
-
-    LOG.info("Completed setting up TaskRunner command " + command.toString());
-    List<String> commands = new ArrayList<String>();
-    commands.add(command.toString());
-
-    return BuilderUtils.newContainerLaunchContext(commonContainerLaunchContext.getLocalResources(),
-        myEnv,
-        commands,
-        myServiceData,
-        null,
-        new HashMap<ApplicationAccessType, String>());
-  }
-
-  public static ContainerLaunchContext createCommonContainerLaunchContext(Configuration config,
-                                                                          String queryId, boolean isMaster) {
-    TajoConf conf = (TajoConf)config;
-
-    ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class);
-
-    try {
-      ByteBuffer userToken = ByteBuffer.wrap(UserGroupInformation.getCurrentUser().getShortUserName().getBytes());
-      ctx.setTokens(userToken);
-    } catch (IOException e) {
-      e.printStackTrace();
-    }
-
-    ////////////////////////////////////////////////////////////////////////////
-    // Set the env variables to be setup
-    ////////////////////////////////////////////////////////////////////////////
-    LOG.info("Set the environment for the application master");
-
-    Map<String, String> environment = new HashMap<String, String>();
-    //String initialClassPath = getInitialClasspath(conf);
-    environment.put(ApplicationConstants.Environment.SHELL.name(), "/bin/bash");
-    if(System.getenv(ApplicationConstants.Environment.JAVA_HOME.name()) != null) {
-      environment.put(ApplicationConstants.Environment.JAVA_HOME.name(), System.getenv(ApplicationConstants.Environment.JAVA_HOME.name()));
-    }
-
-    // TODO - to be improved with org.apache.tajo.sh shell script
-    Properties prop = System.getProperties();
-
-    if (prop.getProperty("tajo.test", "FALSE").equalsIgnoreCase("TRUE") ||
-        (System.getenv("tajo.test") != null && System.getenv("tajo.test").equalsIgnoreCase("TRUE"))) {
-      LOG.info("tajo.test is TRUE");
-      environment.put(ApplicationConstants.Environment.CLASSPATH.name(), prop.getProperty("java.class.path", null));
-      environment.put("tajo.test", "TRUE");
-    } else {
-      // Add AppMaster.jar location to classpath
-      // At some point we should not be required to add
-      // the hadoop specific classpaths to the env.
-      // It should be provided out of the box.
-      // For now setting all required classpaths including
-      // the classpath to "." for the application jar
-      StringBuilder classPathEnv = new StringBuilder("./");
-      //for (String c : conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH)) {
-      for (String c : YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH) {
-        classPathEnv.append(':');
-        classPathEnv.append(c.trim());
-      }
-
-      classPathEnv.append(":" + System.getenv("TAJO_BASE_CLASSPATH"));
-      classPathEnv.append(":./log4j.properties:./*");
-      if(System.getenv("HADOOP_HOME") != null) {
-        environment.put("HADOOP_HOME", System.getenv("HADOOP_HOME"));
-        environment.put(
-            ApplicationConstants.Environment.HADOOP_COMMON_HOME.name(),
-            System.getenv("HADOOP_HOME"));
-        environment.put(
-            ApplicationConstants.Environment.HADOOP_HDFS_HOME.name(),
-            System.getenv("HADOOP_HOME"));
-        environment.put(
-            ApplicationConstants.Environment.HADOOP_YARN_HOME.name(),
-            System.getenv("HADOOP_HOME"));
-      }
-
-      if(System.getenv("TAJO_BASE_CLASSPATH") != null) {
-        environment.put("TAJO_BASE_CLASSPATH", System.getenv("TAJO_BASE_CLASSPATH"));
-      }
-      environment.put(ApplicationConstants.Environment.CLASSPATH.name(), classPathEnv.toString());
-    }
-
-    ctx.setEnvironment(environment);
-
-    if(LOG.isDebugEnabled()) {
-      LOG.debug("=================================================");
-      for(Map.Entry<String, String> entry: environment.entrySet()) {
-        LOG.debug(entry.getKey() + "=" + entry.getValue());
-      }
-      LOG.debug("=================================================");
-    }
-    ////////////////////////////////////////////////////////////////////////////
-    // Set the local resources
-    ////////////////////////////////////////////////////////////////////////////
-    Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
-    LOG.info("defaultFS: " + conf.get(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY));
-
-    try {
-      FileSystem fs = FileSystem.get(conf);
-      FileContext fsCtx = FileContext.getFileContext(conf);
-      Path systemConfPath = TajoConf.getSystemConfPath(conf);
-      if (!fs.exists(systemConfPath)) {
-        LOG.error("system_conf.xml (" + systemConfPath.toString() + ") Not Found");
-      }
-      LocalResource systemConfResource = createApplicationResource(fsCtx, systemConfPath, LocalResourceType.FILE);
-      localResources.put(TajoConstants.SYSTEM_CONF_FILENAME, systemConfResource);
-      ctx.setLocalResources(localResources);
-    } catch (IOException e) {
-      LOG.error(e.getMessage(), e);
-    }
-
-    Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>();
-    try {
-      serviceData.put(PullServerAuxService.PULLSERVER_SERVICEID, PullServerAuxService.serializeMetaData(0));
-    } catch (IOException ioe) {
-      LOG.error(ioe);
-    }
-    ctx.setServiceData(serviceData);
-
-    return ctx;
-  }
-
-  private static LocalResource createApplicationResource(FileContext fs,
-                                                         Path p, LocalResourceType type)
-      throws IOException {
-    LocalResource rsrc = recordFactory.newRecordInstance(LocalResource.class);
-    FileStatus rsrcStat = fs.getFileStatus(p);
-    rsrc.setResource(ConverterUtils.getYarnUrlFromPath(fs
-        .getDefaultFileSystem().resolvePath(rsrcStat.getPath())));
-    rsrc.setSize(rsrcStat.getLen());
-    rsrc.setTimestamp(rsrcStat.getModificationTime());
-    rsrc.setType(type);
-    rsrc.setVisibility(LocalResourceVisibility.APPLICATION);
-    return rsrc;
-  }
-
-  private static void writeConf(Configuration conf, Path queryConfFile)
-      throws IOException {
-    // Write job file to Tajo's fs
-    FileSystem fs = queryConfFile.getFileSystem(conf);
-    FSDataOutputStream out =
-        FileSystem.create(fs, queryConfFile,
-            new FsPermission(QUERYCONF_FILE_PERMISSION));
-    try {
-      conf.writeXml(out);
-    } finally {
-      out.close();
-    }
-  }
-
-  @Override
-  public synchronized void stopContainer() {
-
-    if(isCompletelyDone()) {
-      return;
-    }
-    if(this.state == ContainerState.PREP) {
-      this.state = ContainerState.KILLED_BEFORE_LAUNCH;
-    } else {
-      LOG.info("KILLING " + containerID);
-
-      ContainerManagementProtocol proxy = null;
-      try {
-        proxy = getCMProxy(this.containerID, this.containerMgrAddress,
-            this.containerToken);
-
-        // kill the remote container if already launched
-        List<ContainerId> willBeStopedIds = new ArrayList<ContainerId>();
-        willBeStopedIds.add(this.containerID);
-        StopContainersRequest stopRequests = Records.newRecord(StopContainersRequest.class);
-        stopRequests.setContainerIds(willBeStopedIds);
-        proxy.stopContainers(stopRequests);
-        // If stopContainer returns without an error, assuming the stop made
-        // it over to the NodeManager.
-//          context.getEventHandler().handle(
-//              new AMContainerEvent(containerID, AMContainerEventType.C_NM_STOP_SENT));
-        context.getResourceAllocator().removeContainer(containerID);
-      } catch (Throwable t) {
-
-        // ignore the cleanup failure
-        String message = "cleanup failed for container "
-            + this.containerID + " : "
-            + StringUtils.stringifyException(t);
-//          context.getEventHandler().handle(
-//              new AMContainerEventStopFailed(containerID, message));
-        LOG.warn(message);
-        this.state = ContainerState.DONE;
-        return;
-      } finally {
-        if (proxy != null) {
-          yarnRPC.stopProxy(proxy, conf);
-        }
-      }
-      this.state = ContainerState.DONE;
-    }
-  }
-
-  protected Vector<CharSequence> getTaskParams() {
-    String queryMasterHost = context.getQueryMasterContext().getWorkerContext()
-        .getTajoWorkerManagerService().getBindAddr().getHostName();
-    int queryMasterPort = context.getQueryMasterContext().getWorkerContext()
-        .getTajoWorkerManagerService().getBindAddr().getPort();
-
-    Vector<CharSequence> taskParams = new Vector<CharSequence>();
-    taskParams.add(queryMasterHost); // queryMaster hostname
-    taskParams.add(String.valueOf(queryMasterPort)); // queryMaster port
-    taskParams.add(context.getStagingDir().toString());
-    return taskParams;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/882f92c6/tajo-core/src/main/java/org/apache/tajo/master/YarnTaskRunnerLauncherImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/YarnTaskRunnerLauncherImpl.java b/tajo-core/src/main/java/org/apache/tajo/master/YarnTaskRunnerLauncherImpl.java
deleted file mode 100644
index 8b18b5a..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/YarnTaskRunnerLauncherImpl.java
+++ /dev/null
@@ -1,200 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.master.TaskRunnerGroupEvent.EventType;
-import org.apache.tajo.master.querymaster.QueryMasterTask;
-
-import java.util.*;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-public class YarnTaskRunnerLauncherImpl extends AbstractService implements TaskRunnerLauncher {
-
-  /** Class Logger */
-  private static final Log LOG = LogFactory.getLog(YarnTaskRunnerLauncherImpl.class);
-  //private final YarnRPC yarnRPC;
-  private final static RecordFactory recordFactory =
-      RecordFactoryProvider.getRecordFactory(null);
-  private QueryMasterTask.QueryMasterTaskContext context;
-
-  // For ContainerLauncherSpec
-  private static AtomicBoolean initialClasspathFlag = new AtomicBoolean();
-  private static String initialClasspath = null;
-  private static final Object classpathLock = new Object();
-  private ContainerLaunchContext commonContainerSpec = null;
-
-  final public static FsPermission QUERYCONF_FILE_PERMISSION =
-      FsPermission.createImmutable((short) 0644); // rw-r--r--
-
-  /** for launching TaskRunners in parallel */
-  private final ExecutorService executorService;
-
-  private YarnRPC yarnRPC;
-
-  public YarnTaskRunnerLauncherImpl(QueryMasterTask.QueryMasterTaskContext context, YarnRPC yarnRPC) {
-    super(YarnTaskRunnerLauncherImpl.class.getName());
-    this.context = context;
-    this.yarnRPC = yarnRPC;
-    executorService = Executors.newFixedThreadPool(
-        context.getConf().getIntVar(TajoConf.ConfVars.YARN_RM_TASKRUNNER_LAUNCH_PARALLEL_NUM));
-  }
-
-  public void start() {
-    super.start();
-  }
-
-  public void stop() {
-    executorService.shutdownNow();
-
-    Map<ContainerId, ContainerProxy> containers = context.getResourceAllocator().getContainers();
-    List<ContainerProxy> list = new ArrayList<ContainerProxy>(containers.values());
-    for(ContainerProxy eachProxy:  list) {
-      try {
-        eachProxy.stopContainer();
-      } catch (Exception e) {
-      }
-    }
-    super.stop();
-  }
-
-  @Override
-  public void handle(TaskRunnerGroupEvent event) {
-    if (event.getType() == EventType.CONTAINER_REMOTE_LAUNCH) {
-     launchTaskRunners(event.executionBlockId, event.getContainers());
-    } else if (event.getType() == EventType.CONTAINER_REMOTE_CLEANUP) {
-      stopTaskRunners(event.getContainers());
-    }
-  }
-
-  private void launchTaskRunners(ExecutionBlockId executionBlockId, Collection<Container> containers) {
-    commonContainerSpec = YarnContainerProxy.createCommonContainerLaunchContext(getConfig(),
-        executionBlockId.getQueryId().toString(), false);
-    for (Container container : containers) {
-      final ContainerProxy proxy = new YarnContainerProxy(context, getConfig(),
-          yarnRPC, container, executionBlockId);
-      executorService.submit(new LaunchRunner(container.getId(), proxy));
-    }
-  }
-
-  protected class LaunchRunner implements Runnable {
-    private final ContainerProxy proxy;
-    private final ContainerId id;
-    public LaunchRunner(ContainerId id, ContainerProxy proxy) {
-      this.proxy = proxy;
-      this.id = id;
-    }
-    @Override
-    public void run() {
-      proxy.launch(commonContainerSpec);
-      LOG.info("ContainerProxy started:" + id);
-    }
-  }
-
-  private void stopTaskRunners(Collection<Container> containers) {
-    for (Container container : containers) {
-      final ContainerProxy proxy = context.getResourceAllocator().getContainer(container.getId());
-      executorService.submit(new StopContainerRunner(container.getId(), proxy));
-    }
-  }
-
-  private static class StopContainerRunner implements Runnable {
-    private final ContainerProxy proxy;
-    private final ContainerId id;
-    public StopContainerRunner(ContainerId id, ContainerProxy proxy) {
-      this.id = id;
-      this.proxy = proxy;
-    }
-
-    @Override
-    public void run() {
-      proxy.stopContainer();
-      LOG.info("ContainerProxy stopped:" + id);
-    }
-  }
-
-
-  /**
-   * Lock this on initialClasspath so that there is only one fork in the AM for
-   * getting the initial class-path. TODO: We already construct
-   * a parent CLC and use it for all the containers, so this should go away
-   * once the mr-generated-classpath stuff is gone.
-   */
-  private static String getInitialClasspath(Configuration conf) {
-    synchronized (classpathLock) {
-      if (initialClasspathFlag.get()) {
-        return initialClasspath;
-      }
-      Map<String, String> env = new HashMap<String, String>();
-
-      initialClasspath = env.get(Environment.CLASSPATH.name());
-      initialClasspathFlag.set(true);
-      return initialClasspath;
-    }
-  }
-
-//  public class TaskRunnerContainerProxy extends ContainerProxy {
-//    private final ExecutionBlockId executionBlockId;
-//
-//    public TaskRunnerContainerProxy(QueryMasterTask.QueryContext context, Configuration conf, YarnRPC yarnRPC,
-//                                    Container container, ExecutionBlockId executionBlockId) {
-//      super(context, conf, yarnRPC, container);
-//      this.executionBlockId = executionBlockId;
-//    }
-//
-//    @Override
-//    protected void containerStarted() {
-//      context.getEventHandler().handle(new QueryEvent(context.getQueryId(), QueryEventType.INIT_COMPLETED));
-//    }
-//
-//    @Override
-//    protected String getId() {
-//      return executionBlockId.toString();
-//    }
-//
-//    @Override
-//    protected String getRunnerClass() {
-//      return TaskRunner.class.getCanonicalSignature();
-//    }
-//
-//    @Override
-//    protected Vector<CharSequence> getTaskParams() {
-//      Vector<CharSequence> taskParams = new Vector<CharSequence>();
-//      taskParams.add(queryMasterHost); // queryMaster hostname
-//      taskParams.add(String.valueOf(queryMasterPort)); // queryMaster port
-//
-//      return taskParams;
-//    }
-//  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/882f92c6/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
index 23b0def..39ea430 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
@@ -37,12 +37,16 @@ import org.apache.tajo.catalog.TableDesc;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.parser.HiveQLAnalyzer;
 import org.apache.tajo.engine.parser.SQLAnalyzer;
-import org.apache.tajo.engine.planner.*;
+import org.apache.tajo.engine.planner.LogicalOptimizer;
+import org.apache.tajo.engine.planner.LogicalPlan;
+import org.apache.tajo.engine.planner.LogicalPlanner;
+import org.apache.tajo.engine.planner.PlannerUtil;
 import org.apache.tajo.engine.planner.global.MasterPlan;
 import org.apache.tajo.engine.planner.logical.LogicalNode;
 import org.apache.tajo.engine.planner.logical.NodeType;
 import org.apache.tajo.engine.planner.logical.ScanNode;
 import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.exception.UnimplementedException;
 import org.apache.tajo.ipc.TajoMasterProtocol;
 import org.apache.tajo.master.GlobalEngine;
 import org.apache.tajo.master.TajoAsyncDispatcher;
@@ -58,7 +62,6 @@ import org.apache.tajo.util.metrics.TajoMetrics;
 import org.apache.tajo.util.metrics.reporter.MetricsConsoleReporter;
 import org.apache.tajo.worker.AbstractResourceAllocator;
 import org.apache.tajo.worker.TajoResourceAllocator;
-import org.apache.tajo.worker.YarnResourceAllocator;
 
 import java.io.IOException;
 import java.util.HashMap;
@@ -137,7 +140,7 @@ public class QueryMasterTask extends CompositeService {
       if(resourceManagerClassName.indexOf(TajoWorkerResourceManager.class.getName()) >= 0) {
         resourceAllocator = new TajoResourceAllocator(queryTaskContext);
       } else {
-        resourceAllocator = new YarnResourceAllocator(queryTaskContext);
+        throw new UnimplementedException(resourceManagerClassName + " is not supported yet");
       }
       addService(resourceAllocator);
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/882f92c6/tajo-core/src/main/java/org/apache/tajo/master/rm/YarnRMContainerAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/YarnRMContainerAllocator.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/YarnRMContainerAllocator.java
deleted file mode 100644
index b9e132b..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/YarnRMContainerAllocator.java
+++ /dev/null
@@ -1,237 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.rm;
-
-import com.google.common.collect.Lists;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.TajoProtos;
-import org.apache.tajo.master.event.ContainerAllocationEvent;
-import org.apache.tajo.master.event.ContainerAllocatorEventType;
-import org.apache.tajo.master.event.SubQueryContainerAllocationEvent;
-import org.apache.tajo.master.querymaster.Query;
-import org.apache.tajo.master.querymaster.QueryMasterTask;
-import org.apache.tajo.master.querymaster.SubQuery;
-import org.apache.tajo.master.querymaster.SubQueryState;
-import org.apache.tajo.util.ApplicationIdUtils;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-
-public class YarnRMContainerAllocator extends AMRMClientImpl
-    implements EventHandler<ContainerAllocationEvent> {
-
-  /** Class Logger */
-  private static final Log LOG = LogFactory.getLog(YarnRMContainerAllocator.
-      class.getName());
-
-  private QueryMasterTask.QueryMasterTaskContext context;
-  private ApplicationAttemptId appAttemptId;
-  private final EventHandler eventHandler;
-
-  public YarnRMContainerAllocator(QueryMasterTask.QueryMasterTaskContext context) {
-    super();
-    this.context = context;
-    this.appAttemptId = ApplicationIdUtils.createApplicationAttemptId(context.getQueryId());
-    this.eventHandler = context.getDispatcher().getEventHandler();
-  }
-
-  public void init(Configuration conf) {
-    super.init(conf);
-  }
-
-  private static final int WAIT_INTERVAL_AVAILABLE_NODES = 500; // 0.5 second
-  public void start() {
-    super.start();
-
-    RegisterApplicationMasterResponse response;
-    try {
-      response = registerApplicationMaster("localhost", 10080, "http://localhost:1234");
-
-      // If the number of cluster nodes is ZERO, it waits for available nodes.
-      AllocateResponse allocateResponse = allocate(0.0f);
-      while(allocateResponse.getNumClusterNodes() < 1) {
-        try {
-          Thread.sleep(WAIT_INTERVAL_AVAILABLE_NODES);
-          LOG.info("Waiting for Available Cluster Nodes");
-          allocateResponse = allocate(0);
-        } catch (InterruptedException e) {
-          LOG.error(e);
-        }
-      }
-      context.getQueryMasterContext().getWorkerContext().setNumClusterNodes(allocateResponse.getNumClusterNodes());
-    } catch (IOException e) {
-      LOG.error(e);
-    } catch (YarnException e) {
-      LOG.error(e);
-    }
-
-    startAllocatorThread();
-  }
-
-  protected Thread allocatorThread;
-  private final AtomicBoolean stopped = new AtomicBoolean(false);
-  private int rmPollInterval = 100;//millis
-
-  protected void startAllocatorThread() {
-    allocatorThread = new Thread(new Runnable() {
-      @Override
-      public void run() {
-        while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
-          try {
-            try {
-              heartbeat();
-            } catch (YarnException e) {
-              LOG.error("Error communicating with RM: " + e.getMessage() , e);
-              return;
-            } catch (Exception e) {
-              LOG.error("ERROR IN CONTACTING RM. ", e);
-              // TODO: for other exceptions
-              if(stopped.get()) {
-                break;
-              }
-            }
-            Thread.sleep(rmPollInterval);
-          } catch (InterruptedException e) {
-            if (!stopped.get()) {
-              LOG.warn("Allocated thread interrupted. Returning.");
-            }
-            break;
-          }
-        }
-        LOG.info("Allocated thread stopped");
-      }
-    });
-    allocatorThread.setName("YarnRMContainerAllocator");
-    allocatorThread.start();
-  }
-
-  public void stop() {
-    if(stopped.get()) {
-      return;
-    }
-    LOG.info("un-registering ApplicationMaster(QueryMaster):" + appAttemptId);
-    stopped.set(true);
-
-    try {
-      FinalApplicationStatus status = FinalApplicationStatus.UNDEFINED;
-      Query query = context.getQuery();
-      if (query != null) {
-        TajoProtos.QueryState state = query.getState();
-        if (state == TajoProtos.QueryState.QUERY_SUCCEEDED) {
-          status = FinalApplicationStatus.SUCCEEDED;
-        } else if (state == TajoProtos.QueryState.QUERY_FAILED || state == TajoProtos.QueryState.QUERY_ERROR) {
-          status = FinalApplicationStatus.FAILED;
-        } else if (state == TajoProtos.QueryState.QUERY_ERROR) {
-          status = FinalApplicationStatus.FAILED;
-        }
-      }
-      unregisterApplicationMaster(status, "tajo query finished", null);
-    } catch (Exception e) {
-      LOG.error(e.getMessage(), e);
-    }
-
-    allocatorThread.interrupt();
-    LOG.info("un-registered ApplicationMAster(QueryMaster) stopped:" + appAttemptId);
-
-    super.stop();
-  }
-
-  private final Map<Priority, ExecutionBlockId> subQueryMap =
-      new HashMap<Priority, ExecutionBlockId>();
-
-  private AtomicLong prevReportTime = new AtomicLong(0);
-  private int reportInterval = 5 * 1000; // second
-
-  public void heartbeat() throws Exception {
-    AllocateResponse allocateResponse = allocate(context.getProgress());
-
-    List<Container> allocatedContainers = allocateResponse.getAllocatedContainers();
-
-    long currentTime = System.currentTimeMillis();
-    if ((currentTime - prevReportTime.longValue()) >= reportInterval) {
-      LOG.debug("Available Cluster Nodes: " + allocateResponse.getNumClusterNodes());
-      LOG.debug("Num of Allocated Containers: " + allocatedContainers.size());
-      LOG.info("Available Resource: " + allocateResponse.getAvailableResources());
-      prevReportTime.set(currentTime);
-    }
-
-    if (allocatedContainers.size() > 0) {
-      LOG.info("================================================================");
-      for (Container container : allocateResponse.getAllocatedContainers()) {
-        LOG.info("> Container Id: " + container.getId());
-        LOG.info("> Node Id: " + container.getNodeId());
-        LOG.info("> Resource (Mem): " + container.getResource().getMemory());
-        LOG.info("> Priority: " + container.getPriority());
-      }
-      LOG.info("================================================================");
-
-      Map<ExecutionBlockId, List<Container>> allocated = new HashMap<ExecutionBlockId, List<Container>>();
-      for (Container container : allocatedContainers) {
-        ExecutionBlockId executionBlockId = subQueryMap.get(container.getPriority());
-        SubQueryState state = context.getSubQuery(executionBlockId).getState();
-        if (!(SubQuery.isRunningState(state))) {
-          releaseAssignedContainer(container.getId());
-        } else {
-          if (allocated.containsKey(executionBlockId)) {
-            allocated.get(executionBlockId).add(container);
-          } else {
-            allocated.put(executionBlockId, Lists.newArrayList(container));
-          }
-        }
-      }
-
-      for (Entry<ExecutionBlockId, List<Container>> entry : allocated.entrySet()) {
-        eventHandler.handle(new SubQueryContainerAllocationEvent(entry.getKey(), entry.getValue()));
-      }
-    }
-  }
-
-  @Override
-  public void handle(ContainerAllocationEvent event) {
-
-    if (event.getType() == ContainerAllocatorEventType.CONTAINER_REQ) {
-      LOG.info(event);
-      subQueryMap.put(event.getPriority(), event.getExecutionBlockId());
-      addContainerRequest(new ContainerRequest(event.getCapability(), null, null,
-          event.getPriority()));
-
-    } else if (event.getType() == ContainerAllocatorEventType.CONTAINER_DEALLOCATE) {
-      LOG.info(event);
-    } else {
-      LOG.info(event);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/882f92c6/tajo-core/src/main/java/org/apache/tajo/master/rm/YarnTajoResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/YarnTajoResourceManager.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/YarnTajoResourceManager.java
deleted file mode 100644
index 6d5268c..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/YarnTajoResourceManager.java
+++ /dev/null
@@ -1,349 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.rm;
-
-import com.google.protobuf.RpcCallback;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.records.*;
-import org.apache.hadoop.yarn.client.api.YarnClient;
-import org.apache.hadoop.yarn.client.api.YarnClientApplication;
-import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.proto.YarnProtos;
-import org.apache.hadoop.yarn.server.utils.BuilderUtils;
-import org.apache.hadoop.yarn.util.Records;
-import org.apache.tajo.QueryId;
-import org.apache.tajo.TajoProtos;
-import org.apache.tajo.exception.UnimplementedException;
-import org.apache.tajo.ipc.TajoMasterProtocol;
-import org.apache.tajo.master.TajoMaster;
-import org.apache.tajo.master.YarnContainerProxy;
-import org.apache.tajo.master.querymaster.QueryInProgress;
-import org.apache.tajo.util.ApplicationIdUtils;
-import org.apache.tajo.worker.TajoWorker;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.security.PrivilegedAction;
-import java.util.*;
-
-import static org.apache.tajo.ipc.TajoMasterProtocol.WorkerAllocatedResource;
-import static org.apache.tajo.ipc.TajoMasterProtocol.WorkerResourceAllocationResponse;
-
-public class YarnTajoResourceManager extends AbstractService implements WorkerResourceManager {
-  private static final Log LOG = LogFactory.getLog(YarnTajoResourceManager.class);
-
-  private YarnClient yarnClient;
-  private ApplicationMasterProtocol rmClient;
-  private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
-  private Configuration conf;
-  private TajoMaster.MasterContext masterContext;
-
-  public YarnTajoResourceManager() {
-    super(YarnTajoResourceManager.class.getSimpleName());
-  }
-
-  public YarnTajoResourceManager(TajoMaster.MasterContext masterContext) {
-    super(YarnTajoResourceManager.class.getSimpleName());
-    this.masterContext = masterContext;
-  }
-
-  @Override
-  public void stop() {
-  }
-
-  @Override
-  public Map<String, Worker> getWorkers() {
-    return new HashMap<String, Worker>();
-  }
-
-  @Override
-  public Map<String, Worker> getInactiveWorkers() {
-    return new HashMap<String, Worker>();
-  }
-
-  public Collection<String> getQueryMasters() {
-    return new ArrayList<String>();
-  }
-
-  public TajoMasterProtocol.ClusterResourceSummary getClusterResourceSummary() {
-    return TajoMasterProtocol.ClusterResourceSummary.newBuilder()
-        .setNumWorkers(0)
-        .setTotalCpuCoreSlots(0)
-        .setTotalDiskSlots(0)
-        .setTotalMemoryMB(0)
-        .setTotalAvailableCpuCoreSlots(0)
-        .setTotalAvailableDiskSlots(0)
-        .setTotalAvailableMemoryMB(0)
-        .build();
-  }
-
-  @Override
-  public void releaseWorkerResource(YarnProtos.ContainerIdProto containerId) {
-    throw new UnimplementedException("releaseWorkerResource");
-  }
-
-  @Override
-  public WorkerAllocatedResource allocateQueryMaster(QueryInProgress queryInProgress) {
-    throw new UnimplementedException("allocateQueryMaster");
-  }
-
-  @Override
-  public void allocateWorkerResources(
-      TajoMasterProtocol.WorkerResourceAllocationRequest request,
-      RpcCallback<WorkerResourceAllocationResponse> rpcCallBack) {
-    throw new UnimplementedException("allocateWorkerResources");
-  }
-
-  @Override
-  public void init(Configuration conf) {
-    this.conf = conf;
-    connectYarnClient();
-
-    final YarnConfiguration yarnConf = new YarnConfiguration(conf);
-    final YarnRPC rpc = YarnRPC.create(conf);
-    final InetSocketAddress rmAddress = conf.getSocketAddr(
-        YarnConfiguration.RM_SCHEDULER_ADDRESS,
-        YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
-        YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
-
-    UserGroupInformation currentUser;
-    try {
-      currentUser = UserGroupInformation.getCurrentUser();
-    } catch (IOException e) {
-      throw new YarnRuntimeException(e);
-    }
-
-    rmClient = currentUser.doAs(new PrivilegedAction<ApplicationMasterProtocol>() {
-      @Override
-      public ApplicationMasterProtocol run() {
-        return (ApplicationMasterProtocol) rpc.getProxy(ApplicationMasterProtocol.class, rmAddress, yarnConf);
-      }
-    });
-  }
-
-  @Override
-  public String getSeedQueryId() throws IOException {
-    try {
-      YarnClientApplication app = yarnClient.createApplication();
-      return app.getApplicationSubmissionContext().getApplicationId().toString();
-    } catch (Exception e) {
-      LOG.error(e.getMessage(), e);
-
-      throw new IOException(e.getMessage(), e);
-    }
-  }
-
-  @Override
-  public void stopQueryMaster(QueryId queryId) {
-    try {
-      FinalApplicationStatus appStatus = FinalApplicationStatus.UNDEFINED;
-      QueryInProgress queryInProgress = masterContext.getQueryJobManager().getQueryInProgress(queryId);
-      if(queryInProgress == null) {
-        return;
-      }
-      TajoProtos.QueryState state = queryInProgress.getQueryInfo().getQueryState();
-      if (state == TajoProtos.QueryState.QUERY_SUCCEEDED) {
-        appStatus = FinalApplicationStatus.SUCCEEDED;
-      } else if (state == TajoProtos.QueryState.QUERY_FAILED || state == TajoProtos.QueryState.QUERY_ERROR) {
-        appStatus = FinalApplicationStatus.FAILED;
-      } else if (state == TajoProtos.QueryState.QUERY_ERROR) {
-        appStatus = FinalApplicationStatus.FAILED;
-      }
-      FinishApplicationMasterRequest request = recordFactory
-          .newRecordInstance(FinishApplicationMasterRequest.class);
-      request.setFinalApplicationStatus(appStatus);
-      request.setDiagnostics("QueryMaster shutdown by TajoMaster.");
-      rmClient.finishApplicationMaster(request);
-    } catch (Exception e) {
-      LOG.error(e.getMessage(), e);
-    }
-  }
-
-  private void connectYarnClient() {
-    this.yarnClient = new YarnClientImpl();
-    this.yarnClient.init(conf);
-    this.yarnClient.start();
-  }
-
-  private ApplicationAttemptId allocateAndLaunchQueryMaster(QueryInProgress queryInProgress) throws IOException, YarnException {
-    QueryId queryId = queryInProgress.getQueryId();
-    ApplicationId appId = ApplicationIdUtils.queryIdToAppId(queryId);
-
-    LOG.info("Allocate and launch ApplicationMaster for QueryMaster: queryId=" +
-        queryId + ", appId=" + appId);
-
-    ApplicationSubmissionContext appContext = Records.newRecord(ApplicationSubmissionContext.class);
-
-    // set the application id
-    appContext.setApplicationId(appId);
-    // set the application name
-    appContext.setApplicationName("Tajo");
-
-    Priority pri = Records.newRecord(Priority.class);
-    pri.setPriority(5);
-    appContext.setPriority(pri);
-
-    // Set the queue to which this application is to be submitted in the RM
-    appContext.setQueue("default");
-
-    ContainerLaunchContext commonContainerLaunchContext =
-        YarnContainerProxy.createCommonContainerLaunchContext(masterContext.getConf(), queryId.toString(), true);
-
-    // Setup environment by cloning from common env.
-    Map<String, String> env = commonContainerLaunchContext.getEnvironment();
-    Map<String, String> myEnv = new HashMap<String, String>(env.size());
-    myEnv.putAll(env);
-
-    ////////////////////////////////////////////////////////////////////////////
-    // Set the local resources
-    ////////////////////////////////////////////////////////////////////////////
-    // Set the necessary command to execute the application master
-    Vector<CharSequence> vargs = new Vector<CharSequence>(30);
-
-    // Set java executable command
-    //LOG.info("Setting up app master command");
-    vargs.add("${JAVA_HOME}" + "/bin/java");
-    // Set Xmx based on am memory size
-    String jvmOptions = masterContext.getConf().get("tajo.rm.yarn.querymaster.jvm.option", "-Xmx2000m");
-
-    for(String eachToken: jvmOptions.split((" "))) {
-      vargs.add(eachToken);
-    }
-    // Set Remote Debugging
-    //if (!context.getQuery().getSubQuery(event.getExecutionBlockId()).isLeafQuery()) {
-    //vargs.add("-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=5005");
-    //}
-    // Set class name
-    vargs.add(TajoWorker.class.getCanonicalName());
-    vargs.add("qm");
-    vargs.add(queryId.toString()); // queryId
-    vargs.add(masterContext.getTajoMasterService().getBindAddress().getHostName() + ":" +
-        masterContext.getTajoMasterService().getBindAddress().getPort());
-
-    vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout");
-    vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr");
-
-    // Get final commmand
-    StringBuilder command = new StringBuilder();
-    for (CharSequence str : vargs) {
-      command.append(str).append(" ");
-    }
-
-    LOG.info("Completed setting up QueryMasterRunner command " + command.toString());
-    List<String> commands = new ArrayList<String>();
-    commands.add(command.toString());
-
-    final Resource resource = Records.newRecord(Resource.class);
-    // TODO - get default value from conf
-    resource.setMemory(2000);
-    resource.setVirtualCores(1);
-
-    Map<String, ByteBuffer> myServiceData = new HashMap<String, ByteBuffer>();
-
-    ContainerLaunchContext masterContainerContext = BuilderUtils.newContainerLaunchContext(
-        commonContainerLaunchContext.getLocalResources(),
-        myEnv,
-        commands,
-        myServiceData,
-        null,
-        new HashMap<ApplicationAccessType, String>(2)
-    );
-
-    appContext.setAMContainerSpec(masterContainerContext);
-
-    LOG.info("Submitting QueryMaster to ResourceManager");
-    yarnClient.submitApplication(appContext);
-
-    ApplicationReport appReport = monitorApplication(appId, EnumSet.of(YarnApplicationState.ACCEPTED));
-    ApplicationAttemptId attemptId = appReport.getCurrentApplicationAttemptId();
-
-    LOG.info("Launching QueryMaster with appAttemptId: " + attemptId);
-
-    return attemptId;
-  }
-
-  private ApplicationReport monitorApplication(ApplicationId appId,
-                                               Set<YarnApplicationState> finalState) throws IOException, YarnException {
-
-    long sleepTime = 100;
-    int count = 1;
-    while (true) {
-      // Get application report for the appId we are interested in
-      ApplicationReport report = yarnClient.getApplicationReport(appId);
-
-      LOG.info("Got application report from ASM for" + ", appId="
-          + appId.getId() + ", appAttemptId="
-          + report.getCurrentApplicationAttemptId() + ", clientToken="
-          + report.getClientToAMToken() + ", appDiagnostics="
-          + report.getDiagnostics() + ", appMasterHost=" + report.getHost()
-          + ", appQueue=" + report.getQueue() + ", appMasterRpcPort="
-          + report.getRpcPort() + ", appStartTime=" + report.getStartTime()
-          + ", yarnAppState=" + report.getYarnApplicationState().toString()
-          + ", distributedFinalState="
-          + report.getFinalApplicationStatus().toString() + ", appTrackingUrl="
-          + report.getTrackingUrl() + ", appUser=" + report.getUser());
-
-      YarnApplicationState state = report.getYarnApplicationState();
-      if (finalState.contains(state)) {
-        return report;
-      }
-      try {
-        Thread.sleep(sleepTime);
-        sleepTime = count * 100;
-        if(count < 10) {
-          count++;
-        }
-      } catch (InterruptedException e) {
-        //LOG.debug("Thread sleep in monitoring loop interrupted");
-      }
-    }
-  }
-
-  public boolean isQueryMasterStopped(QueryId queryId) {
-    ApplicationId appId = ApplicationIdUtils.queryIdToAppId(queryId);
-    try {
-      ApplicationReport report = yarnClient.getApplicationReport(appId);
-      YarnApplicationState state = report.getYarnApplicationState();
-      return EnumSet.of(
-          YarnApplicationState.FINISHED,
-          YarnApplicationState.KILLED,
-          YarnApplicationState.FAILED).contains(state);
-    } catch (YarnException e) {
-      LOG.error(e.getMessage(), e);
-      return false;
-    } catch (IOException e) {
-      LOG.error(e.getMessage(), e);
-      return false;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/882f92c6/tajo-core/src/main/java/org/apache/tajo/worker/YarnResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/YarnResourceAllocator.java b/tajo-core/src/main/java/org/apache/tajo/worker/YarnResourceAllocator.java
deleted file mode 100644
index 1771255..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/worker/YarnResourceAllocator.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.worker;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.service.Service;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
-import org.apache.hadoop.yarn.client.api.YarnClient;
-import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.proto.YarnProtos;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.master.TaskRunnerGroupEvent;
-import org.apache.tajo.master.TaskRunnerLauncher;
-import org.apache.tajo.master.YarnTaskRunnerLauncherImpl;
-import org.apache.tajo.master.event.ContainerAllocatorEventType;
-import org.apache.tajo.master.querymaster.QueryMasterTask;
-import org.apache.tajo.master.rm.YarnRMContainerAllocator;
-
-public class YarnResourceAllocator extends AbstractResourceAllocator {
-  private YarnRMContainerAllocator rmAllocator;
-
-  private TaskRunnerLauncher taskRunnerLauncher;
-
-  private YarnRPC yarnRPC;
-
-  private YarnClient yarnClient;
-
-  private static final Log LOG = LogFactory.getLog(YarnResourceAllocator.class.getName());
-
-  private QueryMasterTask.QueryMasterTaskContext queryTaskContext;
-
-  private TajoConf systemConf;
-
-  public YarnResourceAllocator(QueryMasterTask.QueryMasterTaskContext queryTaskContext) {
-    this.queryTaskContext = queryTaskContext;
-  }
-
-  @Override
-  public ContainerId makeContainerId(YarnProtos.ContainerIdProto containerId) {
-    return new ContainerIdPBImpl(containerId);
-  }
-
-  @Override
-  public void allocateTaskWorker() {
-  }
-
-  @Override
-  public int calculateNumRequestContainers(TajoWorker.WorkerContext workerContext,
-                                           int numTasks,
-                                           int memoryMBPerTask) {
-    int numClusterNodes = workerContext.getNumClusterNodes();
-
-    TajoConf conf =  (TajoConf)workerContext.getQueryMaster().getConfig();
-    int workerNum = conf.getIntVar(TajoConf.ConfVars.YARN_RM_WORKER_NUMBER_PER_NODE);
-    return numClusterNodes == 0 ? numTasks: Math.min(numTasks, numClusterNodes * workerNum);
-  }
-
-  @Override
-  public void init(Configuration conf) {
-    systemConf = (TajoConf)conf;
-
-    yarnRPC = YarnRPC.create(systemConf);
-
-    connectYarnClient();
-
-    taskRunnerLauncher = new YarnTaskRunnerLauncherImpl(queryTaskContext, yarnRPC);
-    addService((Service) taskRunnerLauncher);
-    queryTaskContext.getDispatcher().register(TaskRunnerGroupEvent.EventType.class, taskRunnerLauncher);
-
-    rmAllocator = new YarnRMContainerAllocator(queryTaskContext);
-    addService(rmAllocator);
-    queryTaskContext.getDispatcher().register(ContainerAllocatorEventType.class, rmAllocator);
-    super.init(conf);
-  }
-
-  @Override
-  public void stop() {
-    try {
-      this.yarnClient.stop();
-    } catch (Exception e) {
-      LOG.error(e.getMessage(), e);
-    }
-    super.stop();
-  }
-
-  @Override
-  public void start() {
-    super.start();
-  }
-
-  private void connectYarnClient() {
-    this.yarnClient = new YarnClientImpl();
-    this.yarnClient.init(systemConf);
-    this.yarnClient.start();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/882f92c6/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
index ed5e4bc..010faa8 100644
--- a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
+++ b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
@@ -40,7 +40,6 @@ import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.master.TajoMaster;
 import org.apache.tajo.master.rm.TajoWorkerResourceManager;
-import org.apache.tajo.master.rm.YarnTajoResourceManager;
 import org.apache.tajo.util.CommonTestingUtil;
 import org.apache.tajo.util.NetUtils;
 import org.apache.tajo.worker.TajoWorker;
@@ -93,12 +92,7 @@ public class TajoTestingCluster {
   void initPropertiesAndConfigs() {
     if (System.getProperty(ConfVars.RESOURCE_MANAGER_CLASS.varname) != null) {
       String testResourceManager = System.getProperty(ConfVars.RESOURCE_MANAGER_CLASS.varname);
-      Preconditions.checkState(
-          testResourceManager.equals(TajoWorkerResourceManager.class.getCanonicalName()) ||
-              testResourceManager.equals(YarnTajoResourceManager.class.getCanonicalName()),
-          ConfVars.RESOURCE_MANAGER_CLASS.varname + " must be either " + TajoWorkerResourceManager.class.getCanonicalName() + " or " +
-              YarnTajoResourceManager.class.getCanonicalName() +"."
-      );
+      Preconditions.checkState(testResourceManager.equals(TajoWorkerResourceManager.class.getCanonicalName()));
       conf.set(ConfVars.RESOURCE_MANAGER_CLASS.varname, System.getProperty(ConfVars.RESOURCE_MANAGER_CLASS.varname));
     }
     conf.setInt(ConfVars.WORKER_RESOURCE_AVAILABLE_MEMORY_MB.varname, 1024);

http://git-wip-us.apache.org/repos/asf/tajo/blob/882f92c6/tajo-maven-plugins/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-maven-plugins/pom.xml b/tajo-maven-plugins/pom.xml
index 7de67ef..5b7c63d 100644
--- a/tajo-maven-plugins/pom.xml
+++ b/tajo-maven-plugins/pom.xml
@@ -26,6 +26,7 @@
   <artifactId>tajo-maven-plugins</artifactId>
   <packaging>maven-plugin</packaging>
   <name>Tajo Maven Plugins</name>
+  <version>0.8.0-SNAPSHOT</version>
   <properties>
     <maven.dependency.version>3.0</maven.dependency.version>
   </properties>

http://git-wip-us.apache.org/repos/asf/tajo/blob/882f92c6/tajo-project/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-project/pom.xml b/tajo-project/pom.xml
index 8e30759..cf0d538 100644
--- a/tajo-project/pom.xml
+++ b/tajo-project/pom.xml
@@ -554,6 +554,12 @@
             </lifecycleMappingMetadata>
           </configuration>
         </plugin>
+
+        <plugin>
+          <groupId>org.apache.tajo</groupId>
+          <artifactId>tajo-maven-plugins</artifactId>
+          <version>${tajo.version}</version>
+        </plugin>
       </plugins>
     </pluginManagement>