You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/04/30 13:53:28 UTC

git commit: TAJO-44: Adopt AMRMClient to RMContainerAllocator, RMCommunicator. (hyunsik)

Updated Branches:
  refs/heads/master fef3dd509 -> 39bb519c8


TAJO-44: Adopt AMRMClient to RMContainerAllocator, RMCommunicator. (hyunsik)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/39bb519c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/39bb519c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/39bb519c

Branch: refs/heads/master
Commit: 39bb519c8dbcb5915cdd618e8113df9bc1284182
Parents: fef3dd5
Author: Hyunsik Choi <hy...@apache.org>
Authored: Tue Apr 30 20:03:28 2013 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Tue Apr 30 20:52:26 2013 +0900

----------------------------------------------------------------------
 CHANGES.txt                                        |    5 +-
 .../src/main/java/tajo/master/QueryMaster.java     |    7 +-
 .../main/java/tajo/master/rm/RMCommunicator.java   |  349 ---------------
 .../java/tajo/master/rm/RMContainerAllocator.java  |  300 +++++--------
 4 files changed, 122 insertions(+), 539 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/39bb519c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 33b422c..e2cae30 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -8,6 +8,8 @@ Release 0.2.0 - unreleased
 
   IMPROVEMENTS
 
+    TAJO-44: Adopt AMRMClient to RMContainerAllocator, RMCommunicator. (hyunsik)
+    
     TAJO-42: Divide SubQuery into FSM and execution block parts. (hyunsik)
 
     TAJO-32: Cleanup TaskRunner. (hyunsik)
@@ -30,7 +32,8 @@ Release 0.2.0 - unreleased
 
   BUG FIXES
 
-    TAJO-38: Update class comment in TaskAttemptContext from Korean to English (hsaputra)
+    TAJO-38: Update class comment in TaskAttemptContext from Korean to English 
+    (hsaputra)
 
     TAJO-15: The Integration test is getting hanged on Mac OS X. (hyunsik)
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/39bb519c/tajo-core/tajo-core-backend/src/main/java/tajo/master/QueryMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/QueryMaster.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/QueryMaster.java
index 98878d3..86581f0 100644
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/QueryMaster.java
+++ b/tajo-core/tajo-core-backend/src/main/java/tajo/master/QueryMaster.java
@@ -222,7 +222,6 @@ public class QueryMaster extends CompositeService implements EventHandler {
 
   public class QueryContext {
     private QueryConf conf;
-    int clusterNode;
     public Map<ContainerId, Container> containers = new ConcurrentHashMap<ContainerId, Container>();
     int minCapability;
     int maxCapability;
@@ -296,11 +295,7 @@ public class QueryMaster extends CompositeService implements EventHandler {
     }
 
     public int getNumClusterNode() {
-      return clusterNode;
-    }
-
-    public void setNumClusterNode(int num) {
-      clusterNode = num;
+      return rmAllocator.getClusterNodeCount();
     }
 
     public CatalogService getCatalog() {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/39bb519c/tajo-core/tajo-core-backend/src/main/java/tajo/master/rm/RMCommunicator.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/rm/RMCommunicator.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/rm/RMCommunicator.java
deleted file mode 100644
index 90a0737..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/rm/RMCommunicator.java
+++ /dev/null
@@ -1,349 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package tajo.master.rm;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileContext;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.yarn.YarnException;
-import org.apache.hadoop.yarn.api.AMRMProtocol;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.ContainerManager;
-import org.apache.hadoop.yarn.api.protocolrecords.*;
-import org.apache.hadoop.yarn.api.records.*;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
-import org.apache.hadoop.yarn.service.AbstractService;
-import org.apache.hadoop.yarn.util.BuilderUtils;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.hadoop.yarn.util.ProtoUtils;
-import tajo.TajoProtos.QueryState;
-import tajo.master.Query;
-import tajo.master.QueryMaster.QueryContext;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.security.PrivilegedAction;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-public abstract class RMCommunicator extends AbstractService {
-  private static final Log LOG = LogFactory.getLog(RMCommunicator.class);
-
-  protected static final RecordFactory recordFactory =
-      RecordFactoryProvider.getRecordFactory(null);
-
-  // Resource Manager RPC
-  private YarnRPC rpc;
-  protected AMRMProtocol scheduler;
-
-  // For Query
-  protected QueryContext context;
-  protected Query query;
-  private int rmPollInterval = 1000;//millis
-  protected ApplicationId applicationId;
-  protected ApplicationAttemptId applicationAttemptId;
-  protected Map<ApplicationAccessType, String> applicationACLs;
-
-  // RMCommunicator
-  private final AtomicBoolean stopped;
-  protected Thread allocatorThread;
-
-  // resource
-  private Resource minContainerCapability;
-  private Resource maxContainerCapability;
-
-  // Has a signal (SIGTERM etc) been issued?
-  protected volatile boolean isSignalled = false;
-
-  public RMCommunicator(QueryContext context) {
-    super(RMCommunicator.class.getName());
-    this.context = context;
-    this.applicationId = context.getApplicationId();
-    this.applicationAttemptId = context.getApplicationAttemptId();
-
-    stopped = new AtomicBoolean(false);
-  }
-
-  @Override
-  public void init(Configuration conf) {
-    LOG.info("defaultFS: " + conf.get("fs.default.name"));
-    LOG.info("defaultFS: " + conf.get("fs.defaultFS"));
-
-    super.init(conf);
-  }
-
-  public void start() {
-    this.query = context.getQuery();
-    rpc = YarnRPC.create(getConfig());
-    this.scheduler = createSchedulerProxy();
-
-
-    AllocateRequest allocateRequest = BuilderUtils.newAllocateRequest(
-        applicationAttemptId, 0, 0.0f,
-        new ArrayList<ResourceRequest>(), new ArrayList<ContainerId>());
-    try {
-      AllocateResponse allocateResponse = scheduler.allocate(allocateRequest);
-      context.setNumClusterNode(allocateResponse.getNumClusterNodes());
-    } catch (YarnRemoteException e) {
-      e.printStackTrace();
-    }
-    register();
-    startAllocatorThread();
-    super.start();
-  }
-
-  public void stop() {
-    if (stopped.getAndSet(true)) {
-      // return if already stopped
-      return;
-    }
-    allocatorThread.interrupt();
-    try {
-      allocatorThread.join();
-    } catch (InterruptedException ie) {
-      LOG.warn("InterruptedException while stopping", ie);
-    }
-    unregister();
-    super.stop();
-  }
-
-  protected void register() {
-    //Register
-    try {
-      RegisterApplicationMasterRequest request =
-          recordFactory.newRecordInstance(RegisterApplicationMasterRequest.class);
-      request.setApplicationAttemptId(applicationAttemptId);
-      LOG.info("Tracking Addr: " + context.getRpcAddress());
-      request.setHost(context.getRpcAddress().getHostName());
-      request.setRpcPort(context.getRpcAddress().getPort());
-      // TODO - to be changed to http server
-      //request.setTrackingUrl("http://" + NetUtils.getIpPortString(context.getRpcAddress()));
-      request.setTrackingUrl("http://localhost:1234");
-      RegisterApplicationMasterResponse response =
-          scheduler.registerApplicationMaster(request);
-      minContainerCapability = response.getMinimumResourceCapability();
-      maxContainerCapability = response.getMaximumResourceCapability();
-      context.setMaxContainerCapability(maxContainerCapability.getMemory());
-      context.setMinContainerCapability(minContainerCapability.getMemory());
-      this.applicationACLs = response.getApplicationACLs();
-      LOG.info("minContainerCapability: " + minContainerCapability.getMemory());
-      LOG.info("maxContainerCapability: " + maxContainerCapability.getMemory());
-    } catch (Exception are) {
-      LOG.error("Exception while registering", are);
-      throw new YarnException(are);
-    }
-  }
-
-  protected void unregister() {
-    try {
-      FinalApplicationStatus finishState = FinalApplicationStatus.UNDEFINED;
-      if (query.getState() == QueryState.QUERY_SUCCEEDED) {
-        finishState = FinalApplicationStatus.SUCCEEDED;
-      } else if (query.getState() == QueryState.QUERY_KILLED
-          || (query.getState() == QueryState.QUERY_RUNNING && isSignalled)) {
-        finishState = FinalApplicationStatus.KILLED;
-      } else if (query.getState() == QueryState.QUERY_FAILED
-          || query.getState() == QueryState.QUERY_ERROR) {
-        finishState = FinalApplicationStatus.FAILED;
-      }
-      StringBuffer sb = new StringBuffer();
-//      for (String s : query.getDiagnostics()) {
-//        sb.append(s).append("\n");
-//      }
-      LOG.info("Setting job diagnostics to " + sb.toString());
-
-      // TODO - to be implemented
-//      String historyUrl = JobHistoryUtils.getHistoryUrl(getConfig(),
-//          context.getApplicationId());
-//      LOG.info("History url is " + historyUrl);
-
-      FinishApplicationMasterRequest request =
-          recordFactory.newRecordInstance(FinishApplicationMasterRequest.class);
-      request.setAppAttemptId(this.applicationAttemptId);
-      request.setFinishApplicationStatus(finishState);
-      request.setDiagnostics(""); // TODO - tobe implemented
-      request.setTrackingUrl("");
-      scheduler.finishApplicationMaster(request);
-    } catch(Exception are) {
-      LOG.error("Exception while unregistering ", are);
-    }
-  }
-
-  protected Resource getMinContainerCapability() {
-    return minContainerCapability;
-  }
-
-  protected Resource getMaxContainerCapability() {
-    return maxContainerCapability;
-  }
-
-  public abstract void heartbeat() throws Exception;
-
-  protected AMRMProtocol createSchedulerProxy() {
-    final Configuration conf = getConfig();
-    final InetSocketAddress serviceAddr = conf.getSocketAddr(
-        YarnConfiguration.RM_SCHEDULER_ADDRESS,
-        YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
-        YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
-
-    UserGroupInformation currentUser;
-    try {
-      currentUser = UserGroupInformation.getCurrentUser();
-    } catch (IOException e) {
-      throw new YarnException(e);
-    }
-
-    if (UserGroupInformation.isSecurityEnabled()) {
-      String tokenURLEncodedStr = System.getenv().get(
-          ApplicationConstants.APPLICATION_MASTER_TOKEN_ENV_NAME);
-      Token<? extends TokenIdentifier> token = new Token<TokenIdentifier>();
-
-      try {
-        token.decodeFromUrlString(tokenURLEncodedStr);
-      } catch (IOException e) {
-        throw new YarnException(e);
-      }
-
-      SecurityUtil.setTokenService(token, serviceAddr);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("AppMasterToken is " + token);
-      }
-      currentUser.addToken(token);
-    }
-
-    return currentUser.doAs(new PrivilegedAction<AMRMProtocol>() {
-      @Override
-      public AMRMProtocol run() {
-        return (AMRMProtocol) rpc.getProxy(AMRMProtocol.class,
-            serviceAddr, conf);
-      }
-    });
-  }
-
-  protected void startAllocatorThread() {
-    allocatorThread = new Thread(new Runnable() {
-      @Override
-      public void run() {
-        while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
-          try {
-            Thread.sleep(rmPollInterval);
-            try {
-              heartbeat();
-            } catch (YarnException e) {
-              LOG.error("Error communicating with RM: " + e.getMessage() , e);
-              return;
-            } catch (Exception e) {
-              LOG.error("ERROR IN CONTACTING RM. ", e);
-              // TODO: for other exceptions
-            }
-          } catch (InterruptedException e) {
-            if (!stopped.get()) {
-              LOG.warn("Allocated thread interrupted. Returning.");
-            }
-            return;
-          }
-        }
-      }
-    });
-    allocatorThread.setName("RMCommunicator Allocator");
-    allocatorThread.start();
-  }
-
-  public void setSignalled(boolean isSignalled) {
-    this.isSignalled = isSignalled;
-    LOG.info("RMCommunicator notified that iSignalled is: "
-        + isSignalled);
-  }
-
-  protected ContainerManager getCMProxy(ContainerId containerID,
-                                        final String containerManagerBindAddr, ContainerToken containerToken)
-      throws IOException {
-
-    final InetSocketAddress cmAddr =
-        NetUtils.createSocketAddr(containerManagerBindAddr);
-    UserGroupInformation user = UserGroupInformation.getCurrentUser();
-
-    if (UserGroupInformation.isSecurityEnabled()) {
-      Token<ContainerTokenIdentifier> token =
-          ProtoUtils.convertFromProtoFormat(containerToken, cmAddr);
-      // the user in createRemoteUser in this context has to be ContainerID
-      user = UserGroupInformation.createRemoteUser(containerID.toString());
-      user.addToken(token);
-    }
-
-    ContainerManager proxy = user
-        .doAs(new PrivilegedAction<ContainerManager>() {
-          @Override
-          public ContainerManager run() {
-            return (ContainerManager) rpc.getProxy(ContainerManager.class,
-                cmAddr, getConfig());
-          }
-        });
-    return proxy;
-  }
-
-  public static ContainerLaunchContext newContainerLaunchContext(
-      ContainerId containerID, String user, Resource assignedCapability,
-      Map<String, LocalResource> localResources,
-      Map<String, String> environment, List<String> commands,
-      Map<String, ByteBuffer> serviceData, ByteBuffer containerTokens,
-      Map<ApplicationAccessType, String> acls) {
-    ContainerLaunchContext container = recordFactory
-        .newRecordInstance(ContainerLaunchContext.class);
-    container.setContainerId(containerID);
-    container.setUser(user);
-    container.setResource(assignedCapability);
-    container.setLocalResources(localResources);
-    container.setEnvironment(environment);
-    container.setCommands(commands);
-    container.setServiceData(serviceData);
-    container.setContainerTokens(containerTokens);
-    container.setApplicationACLs(acls);
-    return container;
-  }
-
-  private LocalResource createApplicationResource(FileContext fs, Path p, LocalResourceType type)
-      throws IOException {
-    LocalResource rsrc = recordFactory.newRecordInstance(LocalResource.class);
-    FileStatus rsrcStat = fs.getFileStatus(p);
-    rsrc.setResource(ConverterUtils.getYarnUrlFromPath(fs
-        .getDefaultFileSystem().resolvePath(rsrcStat.getPath())));
-    rsrc.setSize(rsrcStat.getLen());
-    rsrc.setTimestamp(rsrcStat.getModificationTime());
-    rsrc.setType(type);
-    rsrc.setVisibility(LocalResourceVisibility.APPLICATION);
-    return rsrc;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/39bb519c/tajo-core/tajo-core-backend/src/main/java/tajo/master/rm/RMContainerAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/rm/RMContainerAllocator.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/rm/RMContainerAllocator.java
index 7bb7bf0..4c78c26 100644
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/rm/RMContainerAllocator.java
+++ b/tajo-core/tajo-core-backend/src/main/java/tajo/master/rm/RMContainerAllocator.java
@@ -21,158 +21,126 @@ package tajo.master.rm;
 import com.google.common.collect.Lists;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.YarnException;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.api.records.*;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.AMResponse;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.client.AMRMClientImpl;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
-import org.apache.hadoop.yarn.util.BuilderUtils;
-import org.apache.hadoop.yarn.util.Records;
 import tajo.SubQueryId;
+import tajo.TajoProtos.QueryState;
 import tajo.master.QueryMaster.QueryContext;
 import tajo.master.SubQueryState;
-import tajo.master.event.*;
+import tajo.master.event.ContainerAllocationEvent;
+import tajo.master.event.ContainerAllocatorEventType;
+import tajo.master.event.SubQueryContainerAllocationEvent;
 
-import java.util.*;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicBoolean;
 
-public class RMContainerAllocator extends RMCommunicator
+public class RMContainerAllocator extends AMRMClientImpl
     implements EventHandler<ContainerAllocationEvent> {
 
   /** Class Logger */
   private static final Log LOG = LogFactory.getLog(RMContainerAllocator.
       class.getName());
 
+  private QueryContext context;
   private final EventHandler eventHandler;
 
   public RMContainerAllocator(QueryContext context) {
-    super(context);
+    super(context.getApplicationAttemptId());
+    this.context = context;
     this.eventHandler = context.getDispatcher().getEventHandler();
   }
 
-  private Map<Priority, SubQueryId> subQueryMap
-      = new HashMap<Priority, SubQueryId>();
-
-  @Override
-  public void heartbeat() throws Exception {
-    List<Container> allocatedContainers = getResources();
-    Map<SubQueryId, List<Container>> allocated = new HashMap<SubQueryId, List<Container>>();
-    if (allocatedContainers.size() > 0) {
-      for (Container container : allocatedContainers) {
-        SubQueryId subQueryId = subQueryMap.get(container.getPriority());
-        if (!subQueryMap.containsKey(container.getPriority()) ||
-            query.getSubQuery(subQueryId).getState() == SubQueryState.SUCCEEDED) {
-          release.add(container.getId());
-          synchronized (subQueryMap) {
-            subQueryMap.remove(container.getPriority());
-          }
-        } else {
-          if (allocated.containsKey(subQueryId)) {
-            allocated.get(subQueryId).add(container);
-          } else {
-            allocated.put(subQueryId, Lists.newArrayList(container));
-          }
-        }
-      }
-
-      for (Entry<SubQueryId, List<Container>> entry : allocated.entrySet()) {
-        eventHandler.handle(
-            new SubQueryContainerAllocationEvent(entry.getKey(),
-            entry.getValue()));
-      }
-    }
+  public void init(Configuration conf) {
+    super.init(conf);
   }
 
-  @Override
-  public void handle(ContainerAllocationEvent event) {
-
-    if (event.getType() == ContainerAllocatorEventType.CONTAINER_REQ) {
-      LOG.info(event);
-      assign(event);
+  public void start() {
+    super.start();
 
-    } else if (event.getType() == ContainerAllocatorEventType.CONTAINER_DEALLOCATE) {
-      LOG.info(event);
-    } else {
-      LOG.info(event);
+    RegisterApplicationMasterResponse response;
+    try {
+      response = registerApplicationMaster("locahost", 10080, "http://localhost:1234");
+      context.setMaxContainerCapability(response.getMaximumResourceCapability().getMemory());
+      context.setMinContainerCapability(response.getMinimumResourceCapability().getMemory());
+    } catch (YarnRemoteException e) {
+      LOG.error(e);
     }
-  }
 
-  public void assign(ContainerAllocationEvent event) {
-    SubQueryId subQueryId = event.getSubQueryId();
-    subQueryMap.put(event.getPriority(), event.getSubQueryId());
+    startAllocatorThread();
+  }
 
-    int memRequred;
-    int minContainerCapability;
-    int supportedMaxContainerCapability =
-        getMaxContainerCapability().getMemory();
+  protected Thread allocatorThread;
+  private final AtomicBoolean stopped = new AtomicBoolean(false);
+  private int rmPollInterval = 1000;//millis
+  protected void startAllocatorThread() {
+    allocatorThread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
+          try {
+            try {
+              heartbeat();
+            } catch (YarnException e) {
+              LOG.error("Error communicating with RM: " + e.getMessage() , e);
+              return;
+            } catch (Exception e) {
+              LOG.error("ERROR IN CONTACTING RM. ", e);
+              // TODO: for other exceptions
+            }
+            Thread.sleep(rmPollInterval);
+          } catch (InterruptedException e) {
+            if (!stopped.get()) {
+              LOG.warn("Allocated thread interrupted. Returning.");
+            }
+            return;
+          }
+        }
+      }
+    });
+    allocatorThread.setName("RMContainerAllocator");
+    allocatorThread.start();
+  }
 
-    memRequred = event.getCapability().getMemory();
-    minContainerCapability = getMinContainerCapability().getMemory();
-    if (memRequred < minContainerCapability) {
-      memRequred = minContainerCapability;
+  public void stop() {
+    stopped.set(true);
+    super.stop();
+    FinalApplicationStatus finishState = FinalApplicationStatus.UNDEFINED;
+    QueryState state = context.getQuery().getState();
+    if (state == QueryState.QUERY_SUCCEEDED) {
+      finishState = FinalApplicationStatus.SUCCEEDED;
+    } else if (state == QueryState.QUERY_KILLED
+        || (state == QueryState.QUERY_RUNNING)) {
+      finishState = FinalApplicationStatus.KILLED;
+    } else if (state == QueryState.QUERY_FAILED
+        || state == QueryState.QUERY_ERROR) {
+      finishState = FinalApplicationStatus.FAILED;
     }
 
-    if (memRequred > getMaxContainerCapability().getMemory()) {
-      String diagMsg = "Task capability required is more than the supported " +
-          "max container capability in the cluster. Killing the Job. mapResourceReqt: " +
-          memRequred + " maxContainerCapability:" + supportedMaxContainerCapability;
-      LOG.info(diagMsg);
-      eventHandler.handle(new QueryDiagnosticsUpdateEvent(
-          subQueryId.getQueryId(), diagMsg));
-      eventHandler.handle(new QueryEvent(subQueryId.getQueryId(),
-          QueryEventType.KILL));
+    try {
+      unregisterApplicationMaster(finishState, "", "http://localhost:1234");
+    } catch (YarnRemoteException e) {
+      LOG.error(e);
     }
-    LOG.info("mapResourceReqt:"+memRequred);
-    /*
-    if (event.isLeafQuery() && event instanceof GrouppedContainerAllocatorEvent) {
-      GrouppedContainerAllocatorEvent allocatorEvent =
-          (GrouppedContainerAllocatorEvent) event;
-      List<ResourceRequest> requestList = new ArrayList<>();
-      for (Entry<String, Integer> request :
-          allocatorEvent.getRequestMap().entrySet()) {
-
-        ResourceRequest resReq = Records.newRecord(ResourceRequest.class);
-        // TODO - to consider the data locality
-        resReq.setHostName("*");
-        resReq.setCapability(allocatorEvent.getCapability());
-        resReq.setNumContainers(request.getValue());
-        resReq.setPriority(allocatorEvent.getPriority());
-        requestList.add(resReq);
-      }
-
-      ask.addAll(new ArrayList<>(requestList));
-      LOG.info(requestList.size());
-      LOG.info(ask.size());
-    } else {*/
-      ResourceRequest resReq = Records.newRecord(ResourceRequest.class);
-      resReq.setHostName("*");
-      resReq.setCapability(event.getCapability());
-      resReq.setNumContainers(event.getRequiredNum());
-      resReq.setPriority(event.getPriority());
-      ask.add(resReq);
-    //}
   }
 
-  Set<ResourceRequest> ask = new HashSet<ResourceRequest>();
-  Set<ContainerId> release = new HashSet<ContainerId>();
-  Resource availableResources;
-  int lastClusterNmCount = 0;
-  int clusterNmCount = 0;
-  int lastResponseID = 1;
+  private final Map<Priority, SubQueryId> subQueryMap =
+      new HashMap<Priority, SubQueryId>();
 
-  protected AMResponse makeRemoteRequest() throws YarnException, YarnRemoteException {
-    AllocateRequest allocateRequest = BuilderUtils.newAllocateRequest(
-        applicationAttemptId, lastResponseID, query.getProgress(),
-        new ArrayList<ResourceRequest>(ask), new ArrayList<ContainerId>(release));
-    AllocateResponse allocateResponse = scheduler.allocate(allocateRequest);
-    AMResponse response = allocateResponse.getAMResponse();
-    lastResponseID = response.getResponseId();
-    availableResources = response.getAvailableResources();
-    lastClusterNmCount = clusterNmCount;
-    clusterNmCount = allocateResponse.getNumClusterNodes();
+  public void heartbeat() throws Exception {
+    AMResponse response = allocate(context.getProgress()).getAMResponse();
+    List<Container> allocatedContainers = response.getAllocatedContainers();
 
-    //LOG.info("Response Id: " + response.getResponseId());
     LOG.info("Available Resource: " + response.getAvailableResources());
     LOG.info("Num of Allocated Containers: " + response.getAllocatedContainers().size());
     if (response.getAllocatedContainers().size() > 0) {
@@ -186,79 +154,45 @@ public class RMContainerAllocator extends RMCommunicator
       }
       LOG.info("================================================================");
     }
-    /*
-    LOG.info("Reboot: " + response.getReboot());
-    LOG.info("Num of Updated Node: " + response.getUpdatedNodes());
-    for (NodeReport nodeReport : response.getUpdatedNodes()) {
-      LOG.info("> Node Id: " + nodeReport.getNodeId());
-      LOG.info("> Node State: " + nodeReport.getNodeState());
-      LOG.info("> Rack Name: " + nodeReport.getRackName());
-      LOG.info("> Used: " + nodeReport.getUsed());
-    }
-    */
 
+    Map<SubQueryId, List<Container>> allocated = new HashMap<SubQueryId, List<Container>>();
+    if (allocatedContainers.size() > 0) {
+      for (Container container : allocatedContainers) {
+        SubQueryId subQueryId = subQueryMap.get(container.getPriority());
+        if (!subQueryMap.containsKey(container.getPriority()) ||
+            context.getSubQuery(subQueryId).getState() == SubQueryState.SUCCEEDED) {
+          releaseAssignedContainer(container.getId());
+          synchronized (subQueryMap) {
+            subQueryMap.remove(container.getPriority());
+          }
+        } else {
+          if (allocated.containsKey(subQueryId)) {
+            allocated.get(subQueryId).add(container);
+          } else {
+            allocated.put(subQueryId, Lists.newArrayList(container));
+          }
+        }
+      }
 
-    if (ask.size() > 0 || release.size() > 0) {
-      LOG.info("getResources() for " + applicationId + ":" + " ask="
-          + ask.size() + " release= " + release.size() + " newContainers="
-          + response.getAllocatedContainers().size() + " finishedContainers="
-          + response.getCompletedContainersStatuses().size()
-          + " resourcelimit=" + availableResources + " knownNMs="
-          + clusterNmCount);
+      for (Entry<SubQueryId, List<Container>> entry : allocated.entrySet()) {
+        eventHandler.handle(new SubQueryContainerAllocationEvent(entry.getKey(), entry.getValue()));
+      }
     }
-
-    ask.clear();
-    release.clear();
-    return response;
-  }
-
-  public Resource getAvailableResources() {
-    return availableResources;
   }
 
+  @Override
+  public void handle(ContainerAllocationEvent event) {
 
-  long retrystartTime;
-  long retryInterval = 3000;
-  private List<Container> getResources() throws Exception {
-    int headRoom = getAvailableResources() != null ? getAvailableResources().getMemory() : 0;//first time it would be null
-    AMResponse response;
-
-    /*
-    * If contact with RM is lost, the AM will wait MR_AM_TO_RM_WAIT_INTERVAL_MS
-    * milliseconds before aborting. During this interval, AM will still try
-    * to contact the RM.
-    */
-    try {
-      response = makeRemoteRequest();
-      // Reset retry count if no exception occurred.
-      retrystartTime = System.currentTimeMillis();
-    } catch (Exception e) {
-      // This can happen when the connection to the RM has gone down. Keep
-      // re-trying until the retryInterval has expired.
-      if (System.currentTimeMillis() - retrystartTime >= retryInterval) {
-        LOG.error("Could not contact RM after " + retryInterval + " milliseconds.");
-        eventHandler.handle(new QueryEvent(query.getId(),
-            QueryEventType.INTERNAL_ERROR));
-        throw new YarnException("Could not contact RM after " +
-            retryInterval + " milliseconds.");
-      }
-      // Throw this up to the caller, which may decide to ignore it and
-      // continue to attempt to contact the RM.
-      throw e;
-    }
+    if (event.getType() == ContainerAllocatorEventType.CONTAINER_REQ) {
+      LOG.info(event);
+      subQueryMap.put(event.getPriority(), event.getSubQueryId());
+      addContainerRequest(new ContainerRequest(event.getCapability(), null, null,
+          event.getPriority(), event.getRequiredNum()));
 
-    if (response.getReboot()) {
-      // This can happen if the RM has been restarted. If it is in that state,
-      // this application must clean itself up.
-      eventHandler.handle(new QueryEvent(query.getId(),
-          QueryEventType.INTERNAL_ERROR));
-      throw new YarnException("Resource Manager doesn't recognize AttemptId: " +
-          context.getApplicationId());
+    } else if (event.getType() == ContainerAllocatorEventType.CONTAINER_DEALLOCATE) {
+      LOG.info(event);
+    } else {
+      LOG.info(event);
     }
-
-    int newHeadRoom = getAvailableResources() != null ? getAvailableResources().getMemory() : 0;
-    List<Container> newContainers = response.getAllocatedContainers();
-
-    return newContainers;
   }
 }