You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/04/30 13:53:28 UTC
git commit: TAJO-44: Adopt AMRMClient to RMContainerAllocator,
RMCommunicator. (hyunsik)
Updated Branches:
refs/heads/master fef3dd509 -> 39bb519c8
TAJO-44: Adopt AMRMClient to RMContainerAllocator, RMCommunicator. (hyunsik)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/39bb519c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/39bb519c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/39bb519c
Branch: refs/heads/master
Commit: 39bb519c8dbcb5915cdd618e8113df9bc1284182
Parents: fef3dd5
Author: Hyunsik Choi <hy...@apache.org>
Authored: Tue Apr 30 20:03:28 2013 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Tue Apr 30 20:52:26 2013 +0900
----------------------------------------------------------------------
CHANGES.txt | 5 +-
.../src/main/java/tajo/master/QueryMaster.java | 7 +-
.../main/java/tajo/master/rm/RMCommunicator.java | 349 ---------------
.../java/tajo/master/rm/RMContainerAllocator.java | 300 +++++--------
4 files changed, 122 insertions(+), 539 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/39bb519c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 33b422c..e2cae30 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -8,6 +8,8 @@ Release 0.2.0 - unreleased
IMPROVEMENTS
+ TAJO-44: Adopt AMRMClient to RMContainerAllocator, RMCommunicator. (hyunsik)
+
TAJO-42: Divide SubQuery into FSM and execution block parts. (hyunsik)
TAJO-32: Cleanup TaskRunner. (hyunsik)
@@ -30,7 +32,8 @@ Release 0.2.0 - unreleased
BUG FIXES
- TAJO-38: Update class comment in TaskAttemptContext from Korean to English (hsaputra)
+ TAJO-38: Update class comment in TaskAttemptContext from Korean to English
+ (hsaputra)
TAJO-15: The Integration test is getting hanged on Mac OS X. (hyunsik)
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/39bb519c/tajo-core/tajo-core-backend/src/main/java/tajo/master/QueryMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/QueryMaster.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/QueryMaster.java
index 98878d3..86581f0 100644
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/QueryMaster.java
+++ b/tajo-core/tajo-core-backend/src/main/java/tajo/master/QueryMaster.java
@@ -222,7 +222,6 @@ public class QueryMaster extends CompositeService implements EventHandler {
public class QueryContext {
private QueryConf conf;
- int clusterNode;
public Map<ContainerId, Container> containers = new ConcurrentHashMap<ContainerId, Container>();
int minCapability;
int maxCapability;
@@ -296,11 +295,7 @@ public class QueryMaster extends CompositeService implements EventHandler {
}
public int getNumClusterNode() {
- return clusterNode;
- }
-
- public void setNumClusterNode(int num) {
- clusterNode = num;
+ return rmAllocator.getClusterNodeCount();
}
public CatalogService getCatalog() {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/39bb519c/tajo-core/tajo-core-backend/src/main/java/tajo/master/rm/RMCommunicator.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/rm/RMCommunicator.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/rm/RMCommunicator.java
deleted file mode 100644
index 90a0737..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/rm/RMCommunicator.java
+++ /dev/null
@@ -1,349 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package tajo.master.rm;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileContext;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.yarn.YarnException;
-import org.apache.hadoop.yarn.api.AMRMProtocol;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.ContainerManager;
-import org.apache.hadoop.yarn.api.protocolrecords.*;
-import org.apache.hadoop.yarn.api.records.*;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
-import org.apache.hadoop.yarn.service.AbstractService;
-import org.apache.hadoop.yarn.util.BuilderUtils;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.hadoop.yarn.util.ProtoUtils;
-import tajo.TajoProtos.QueryState;
-import tajo.master.Query;
-import tajo.master.QueryMaster.QueryContext;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.security.PrivilegedAction;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-public abstract class RMCommunicator extends AbstractService {
- private static final Log LOG = LogFactory.getLog(RMCommunicator.class);
-
- protected static final RecordFactory recordFactory =
- RecordFactoryProvider.getRecordFactory(null);
-
- // Resource Manager RPC
- private YarnRPC rpc;
- protected AMRMProtocol scheduler;
-
- // For Query
- protected QueryContext context;
- protected Query query;
- private int rmPollInterval = 1000;//millis
- protected ApplicationId applicationId;
- protected ApplicationAttemptId applicationAttemptId;
- protected Map<ApplicationAccessType, String> applicationACLs;
-
- // RMCommunicator
- private final AtomicBoolean stopped;
- protected Thread allocatorThread;
-
- // resource
- private Resource minContainerCapability;
- private Resource maxContainerCapability;
-
- // Has a signal (SIGTERM etc) been issued?
- protected volatile boolean isSignalled = false;
-
- public RMCommunicator(QueryContext context) {
- super(RMCommunicator.class.getName());
- this.context = context;
- this.applicationId = context.getApplicationId();
- this.applicationAttemptId = context.getApplicationAttemptId();
-
- stopped = new AtomicBoolean(false);
- }
-
- @Override
- public void init(Configuration conf) {
- LOG.info("defaultFS: " + conf.get("fs.default.name"));
- LOG.info("defaultFS: " + conf.get("fs.defaultFS"));
-
- super.init(conf);
- }
-
- public void start() {
- this.query = context.getQuery();
- rpc = YarnRPC.create(getConfig());
- this.scheduler = createSchedulerProxy();
-
-
- AllocateRequest allocateRequest = BuilderUtils.newAllocateRequest(
- applicationAttemptId, 0, 0.0f,
- new ArrayList<ResourceRequest>(), new ArrayList<ContainerId>());
- try {
- AllocateResponse allocateResponse = scheduler.allocate(allocateRequest);
- context.setNumClusterNode(allocateResponse.getNumClusterNodes());
- } catch (YarnRemoteException e) {
- e.printStackTrace();
- }
- register();
- startAllocatorThread();
- super.start();
- }
-
- public void stop() {
- if (stopped.getAndSet(true)) {
- // return if already stopped
- return;
- }
- allocatorThread.interrupt();
- try {
- allocatorThread.join();
- } catch (InterruptedException ie) {
- LOG.warn("InterruptedException while stopping", ie);
- }
- unregister();
- super.stop();
- }
-
- protected void register() {
- //Register
- try {
- RegisterApplicationMasterRequest request =
- recordFactory.newRecordInstance(RegisterApplicationMasterRequest.class);
- request.setApplicationAttemptId(applicationAttemptId);
- LOG.info("Tracking Addr: " + context.getRpcAddress());
- request.setHost(context.getRpcAddress().getHostName());
- request.setRpcPort(context.getRpcAddress().getPort());
- // TODO - to be changed to http server
- //request.setTrackingUrl("http://" + NetUtils.getIpPortString(context.getRpcAddress()));
- request.setTrackingUrl("http://localhost:1234");
- RegisterApplicationMasterResponse response =
- scheduler.registerApplicationMaster(request);
- minContainerCapability = response.getMinimumResourceCapability();
- maxContainerCapability = response.getMaximumResourceCapability();
- context.setMaxContainerCapability(maxContainerCapability.getMemory());
- context.setMinContainerCapability(minContainerCapability.getMemory());
- this.applicationACLs = response.getApplicationACLs();
- LOG.info("minContainerCapability: " + minContainerCapability.getMemory());
- LOG.info("maxContainerCapability: " + maxContainerCapability.getMemory());
- } catch (Exception are) {
- LOG.error("Exception while registering", are);
- throw new YarnException(are);
- }
- }
-
- protected void unregister() {
- try {
- FinalApplicationStatus finishState = FinalApplicationStatus.UNDEFINED;
- if (query.getState() == QueryState.QUERY_SUCCEEDED) {
- finishState = FinalApplicationStatus.SUCCEEDED;
- } else if (query.getState() == QueryState.QUERY_KILLED
- || (query.getState() == QueryState.QUERY_RUNNING && isSignalled)) {
- finishState = FinalApplicationStatus.KILLED;
- } else if (query.getState() == QueryState.QUERY_FAILED
- || query.getState() == QueryState.QUERY_ERROR) {
- finishState = FinalApplicationStatus.FAILED;
- }
- StringBuffer sb = new StringBuffer();
-// for (String s : query.getDiagnostics()) {
-// sb.append(s).append("\n");
-// }
- LOG.info("Setting job diagnostics to " + sb.toString());
-
- // TODO - to be implemented
-// String historyUrl = JobHistoryUtils.getHistoryUrl(getConfig(),
-// context.getApplicationId());
-// LOG.info("History url is " + historyUrl);
-
- FinishApplicationMasterRequest request =
- recordFactory.newRecordInstance(FinishApplicationMasterRequest.class);
- request.setAppAttemptId(this.applicationAttemptId);
- request.setFinishApplicationStatus(finishState);
- request.setDiagnostics(""); // TODO - tobe implemented
- request.setTrackingUrl("");
- scheduler.finishApplicationMaster(request);
- } catch(Exception are) {
- LOG.error("Exception while unregistering ", are);
- }
- }
-
- protected Resource getMinContainerCapability() {
- return minContainerCapability;
- }
-
- protected Resource getMaxContainerCapability() {
- return maxContainerCapability;
- }
-
- public abstract void heartbeat() throws Exception;
-
- protected AMRMProtocol createSchedulerProxy() {
- final Configuration conf = getConfig();
- final InetSocketAddress serviceAddr = conf.getSocketAddr(
- YarnConfiguration.RM_SCHEDULER_ADDRESS,
- YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
- YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
-
- UserGroupInformation currentUser;
- try {
- currentUser = UserGroupInformation.getCurrentUser();
- } catch (IOException e) {
- throw new YarnException(e);
- }
-
- if (UserGroupInformation.isSecurityEnabled()) {
- String tokenURLEncodedStr = System.getenv().get(
- ApplicationConstants.APPLICATION_MASTER_TOKEN_ENV_NAME);
- Token<? extends TokenIdentifier> token = new Token<TokenIdentifier>();
-
- try {
- token.decodeFromUrlString(tokenURLEncodedStr);
- } catch (IOException e) {
- throw new YarnException(e);
- }
-
- SecurityUtil.setTokenService(token, serviceAddr);
- if (LOG.isDebugEnabled()) {
- LOG.debug("AppMasterToken is " + token);
- }
- currentUser.addToken(token);
- }
-
- return currentUser.doAs(new PrivilegedAction<AMRMProtocol>() {
- @Override
- public AMRMProtocol run() {
- return (AMRMProtocol) rpc.getProxy(AMRMProtocol.class,
- serviceAddr, conf);
- }
- });
- }
-
- protected void startAllocatorThread() {
- allocatorThread = new Thread(new Runnable() {
- @Override
- public void run() {
- while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
- try {
- Thread.sleep(rmPollInterval);
- try {
- heartbeat();
- } catch (YarnException e) {
- LOG.error("Error communicating with RM: " + e.getMessage() , e);
- return;
- } catch (Exception e) {
- LOG.error("ERROR IN CONTACTING RM. ", e);
- // TODO: for other exceptions
- }
- } catch (InterruptedException e) {
- if (!stopped.get()) {
- LOG.warn("Allocated thread interrupted. Returning.");
- }
- return;
- }
- }
- }
- });
- allocatorThread.setName("RMCommunicator Allocator");
- allocatorThread.start();
- }
-
- public void setSignalled(boolean isSignalled) {
- this.isSignalled = isSignalled;
- LOG.info("RMCommunicator notified that iSignalled is: "
- + isSignalled);
- }
-
- protected ContainerManager getCMProxy(ContainerId containerID,
- final String containerManagerBindAddr, ContainerToken containerToken)
- throws IOException {
-
- final InetSocketAddress cmAddr =
- NetUtils.createSocketAddr(containerManagerBindAddr);
- UserGroupInformation user = UserGroupInformation.getCurrentUser();
-
- if (UserGroupInformation.isSecurityEnabled()) {
- Token<ContainerTokenIdentifier> token =
- ProtoUtils.convertFromProtoFormat(containerToken, cmAddr);
- // the user in createRemoteUser in this context has to be ContainerID
- user = UserGroupInformation.createRemoteUser(containerID.toString());
- user.addToken(token);
- }
-
- ContainerManager proxy = user
- .doAs(new PrivilegedAction<ContainerManager>() {
- @Override
- public ContainerManager run() {
- return (ContainerManager) rpc.getProxy(ContainerManager.class,
- cmAddr, getConfig());
- }
- });
- return proxy;
- }
-
- public static ContainerLaunchContext newContainerLaunchContext(
- ContainerId containerID, String user, Resource assignedCapability,
- Map<String, LocalResource> localResources,
- Map<String, String> environment, List<String> commands,
- Map<String, ByteBuffer> serviceData, ByteBuffer containerTokens,
- Map<ApplicationAccessType, String> acls) {
- ContainerLaunchContext container = recordFactory
- .newRecordInstance(ContainerLaunchContext.class);
- container.setContainerId(containerID);
- container.setUser(user);
- container.setResource(assignedCapability);
- container.setLocalResources(localResources);
- container.setEnvironment(environment);
- container.setCommands(commands);
- container.setServiceData(serviceData);
- container.setContainerTokens(containerTokens);
- container.setApplicationACLs(acls);
- return container;
- }
-
- private LocalResource createApplicationResource(FileContext fs, Path p, LocalResourceType type)
- throws IOException {
- LocalResource rsrc = recordFactory.newRecordInstance(LocalResource.class);
- FileStatus rsrcStat = fs.getFileStatus(p);
- rsrc.setResource(ConverterUtils.getYarnUrlFromPath(fs
- .getDefaultFileSystem().resolvePath(rsrcStat.getPath())));
- rsrc.setSize(rsrcStat.getLen());
- rsrc.setTimestamp(rsrcStat.getModificationTime());
- rsrc.setType(type);
- rsrc.setVisibility(LocalResourceVisibility.APPLICATION);
- return rsrc;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/39bb519c/tajo-core/tajo-core-backend/src/main/java/tajo/master/rm/RMContainerAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/master/rm/RMContainerAllocator.java b/tajo-core/tajo-core-backend/src/main/java/tajo/master/rm/RMContainerAllocator.java
index 7bb7bf0..4c78c26 100644
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/master/rm/RMContainerAllocator.java
+++ b/tajo-core/tajo-core-backend/src/main/java/tajo/master/rm/RMContainerAllocator.java
@@ -21,158 +21,126 @@ package tajo.master.rm;
import com.google.common.collect.Lists;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.YarnException;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.api.records.*;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.AMResponse;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.client.AMRMClientImpl;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
-import org.apache.hadoop.yarn.util.BuilderUtils;
-import org.apache.hadoop.yarn.util.Records;
import tajo.SubQueryId;
+import tajo.TajoProtos.QueryState;
import tajo.master.QueryMaster.QueryContext;
import tajo.master.SubQueryState;
-import tajo.master.event.*;
+import tajo.master.event.ContainerAllocationEvent;
+import tajo.master.event.ContainerAllocatorEventType;
+import tajo.master.event.SubQueryContainerAllocationEvent;
-import java.util.*;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicBoolean;
-public class RMContainerAllocator extends RMCommunicator
+public class RMContainerAllocator extends AMRMClientImpl
implements EventHandler<ContainerAllocationEvent> {
/** Class Logger */
private static final Log LOG = LogFactory.getLog(RMContainerAllocator.
class.getName());
+ private QueryContext context;
private final EventHandler eventHandler;
public RMContainerAllocator(QueryContext context) {
- super(context);
+ super(context.getApplicationAttemptId());
+ this.context = context;
this.eventHandler = context.getDispatcher().getEventHandler();
}
- private Map<Priority, SubQueryId> subQueryMap
- = new HashMap<Priority, SubQueryId>();
-
- @Override
- public void heartbeat() throws Exception {
- List<Container> allocatedContainers = getResources();
- Map<SubQueryId, List<Container>> allocated = new HashMap<SubQueryId, List<Container>>();
- if (allocatedContainers.size() > 0) {
- for (Container container : allocatedContainers) {
- SubQueryId subQueryId = subQueryMap.get(container.getPriority());
- if (!subQueryMap.containsKey(container.getPriority()) ||
- query.getSubQuery(subQueryId).getState() == SubQueryState.SUCCEEDED) {
- release.add(container.getId());
- synchronized (subQueryMap) {
- subQueryMap.remove(container.getPriority());
- }
- } else {
- if (allocated.containsKey(subQueryId)) {
- allocated.get(subQueryId).add(container);
- } else {
- allocated.put(subQueryId, Lists.newArrayList(container));
- }
- }
- }
-
- for (Entry<SubQueryId, List<Container>> entry : allocated.entrySet()) {
- eventHandler.handle(
- new SubQueryContainerAllocationEvent(entry.getKey(),
- entry.getValue()));
- }
- }
+ public void init(Configuration conf) {
+ super.init(conf);
}
- @Override
- public void handle(ContainerAllocationEvent event) {
-
- if (event.getType() == ContainerAllocatorEventType.CONTAINER_REQ) {
- LOG.info(event);
- assign(event);
+ public void start() {
+ super.start();
- } else if (event.getType() == ContainerAllocatorEventType.CONTAINER_DEALLOCATE) {
- LOG.info(event);
- } else {
- LOG.info(event);
+ RegisterApplicationMasterResponse response;
+ try {
+ response = registerApplicationMaster("locahost", 10080, "http://localhost:1234");
+ context.setMaxContainerCapability(response.getMaximumResourceCapability().getMemory());
+ context.setMinContainerCapability(response.getMinimumResourceCapability().getMemory());
+ } catch (YarnRemoteException e) {
+ LOG.error(e);
}
- }
- public void assign(ContainerAllocationEvent event) {
- SubQueryId subQueryId = event.getSubQueryId();
- subQueryMap.put(event.getPriority(), event.getSubQueryId());
+ startAllocatorThread();
+ }
- int memRequred;
- int minContainerCapability;
- int supportedMaxContainerCapability =
- getMaxContainerCapability().getMemory();
+ protected Thread allocatorThread;
+ private final AtomicBoolean stopped = new AtomicBoolean(false);
+ private int rmPollInterval = 1000;//millis
+ protected void startAllocatorThread() {
+ allocatorThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
+ try {
+ try {
+ heartbeat();
+ } catch (YarnException e) {
+ LOG.error("Error communicating with RM: " + e.getMessage() , e);
+ return;
+ } catch (Exception e) {
+ LOG.error("ERROR IN CONTACTING RM. ", e);
+ // TODO: for other exceptions
+ }
+ Thread.sleep(rmPollInterval);
+ } catch (InterruptedException e) {
+ if (!stopped.get()) {
+ LOG.warn("Allocated thread interrupted. Returning.");
+ }
+ return;
+ }
+ }
+ }
+ });
+ allocatorThread.setName("RMContainerAllocator");
+ allocatorThread.start();
+ }
- memRequred = event.getCapability().getMemory();
- minContainerCapability = getMinContainerCapability().getMemory();
- if (memRequred < minContainerCapability) {
- memRequred = minContainerCapability;
+ public void stop() {
+ stopped.set(true);
+ super.stop();
+ FinalApplicationStatus finishState = FinalApplicationStatus.UNDEFINED;
+ QueryState state = context.getQuery().getState();
+ if (state == QueryState.QUERY_SUCCEEDED) {
+ finishState = FinalApplicationStatus.SUCCEEDED;
+ } else if (state == QueryState.QUERY_KILLED
+ || (state == QueryState.QUERY_RUNNING)) {
+ finishState = FinalApplicationStatus.KILLED;
+ } else if (state == QueryState.QUERY_FAILED
+ || state == QueryState.QUERY_ERROR) {
+ finishState = FinalApplicationStatus.FAILED;
}
- if (memRequred > getMaxContainerCapability().getMemory()) {
- String diagMsg = "Task capability required is more than the supported " +
- "max container capability in the cluster. Killing the Job. mapResourceReqt: " +
- memRequred + " maxContainerCapability:" + supportedMaxContainerCapability;
- LOG.info(diagMsg);
- eventHandler.handle(new QueryDiagnosticsUpdateEvent(
- subQueryId.getQueryId(), diagMsg));
- eventHandler.handle(new QueryEvent(subQueryId.getQueryId(),
- QueryEventType.KILL));
+ try {
+ unregisterApplicationMaster(finishState, "", "http://localhost:1234");
+ } catch (YarnRemoteException e) {
+ LOG.error(e);
}
- LOG.info("mapResourceReqt:"+memRequred);
- /*
- if (event.isLeafQuery() && event instanceof GrouppedContainerAllocatorEvent) {
- GrouppedContainerAllocatorEvent allocatorEvent =
- (GrouppedContainerAllocatorEvent) event;
- List<ResourceRequest> requestList = new ArrayList<>();
- for (Entry<String, Integer> request :
- allocatorEvent.getRequestMap().entrySet()) {
-
- ResourceRequest resReq = Records.newRecord(ResourceRequest.class);
- // TODO - to consider the data locality
- resReq.setHostName("*");
- resReq.setCapability(allocatorEvent.getCapability());
- resReq.setNumContainers(request.getValue());
- resReq.setPriority(allocatorEvent.getPriority());
- requestList.add(resReq);
- }
-
- ask.addAll(new ArrayList<>(requestList));
- LOG.info(requestList.size());
- LOG.info(ask.size());
- } else {*/
- ResourceRequest resReq = Records.newRecord(ResourceRequest.class);
- resReq.setHostName("*");
- resReq.setCapability(event.getCapability());
- resReq.setNumContainers(event.getRequiredNum());
- resReq.setPriority(event.getPriority());
- ask.add(resReq);
- //}
}
- Set<ResourceRequest> ask = new HashSet<ResourceRequest>();
- Set<ContainerId> release = new HashSet<ContainerId>();
- Resource availableResources;
- int lastClusterNmCount = 0;
- int clusterNmCount = 0;
- int lastResponseID = 1;
+ private final Map<Priority, SubQueryId> subQueryMap =
+ new HashMap<Priority, SubQueryId>();
- protected AMResponse makeRemoteRequest() throws YarnException, YarnRemoteException {
- AllocateRequest allocateRequest = BuilderUtils.newAllocateRequest(
- applicationAttemptId, lastResponseID, query.getProgress(),
- new ArrayList<ResourceRequest>(ask), new ArrayList<ContainerId>(release));
- AllocateResponse allocateResponse = scheduler.allocate(allocateRequest);
- AMResponse response = allocateResponse.getAMResponse();
- lastResponseID = response.getResponseId();
- availableResources = response.getAvailableResources();
- lastClusterNmCount = clusterNmCount;
- clusterNmCount = allocateResponse.getNumClusterNodes();
+ public void heartbeat() throws Exception {
+ AMResponse response = allocate(context.getProgress()).getAMResponse();
+ List<Container> allocatedContainers = response.getAllocatedContainers();
- //LOG.info("Response Id: " + response.getResponseId());
LOG.info("Available Resource: " + response.getAvailableResources());
LOG.info("Num of Allocated Containers: " + response.getAllocatedContainers().size());
if (response.getAllocatedContainers().size() > 0) {
@@ -186,79 +154,45 @@ public class RMContainerAllocator extends RMCommunicator
}
LOG.info("================================================================");
}
- /*
- LOG.info("Reboot: " + response.getReboot());
- LOG.info("Num of Updated Node: " + response.getUpdatedNodes());
- for (NodeReport nodeReport : response.getUpdatedNodes()) {
- LOG.info("> Node Id: " + nodeReport.getNodeId());
- LOG.info("> Node State: " + nodeReport.getNodeState());
- LOG.info("> Rack Name: " + nodeReport.getRackName());
- LOG.info("> Used: " + nodeReport.getUsed());
- }
- */
+ Map<SubQueryId, List<Container>> allocated = new HashMap<SubQueryId, List<Container>>();
+ if (allocatedContainers.size() > 0) {
+ for (Container container : allocatedContainers) {
+ SubQueryId subQueryId = subQueryMap.get(container.getPriority());
+ if (!subQueryMap.containsKey(container.getPriority()) ||
+ context.getSubQuery(subQueryId).getState() == SubQueryState.SUCCEEDED) {
+ releaseAssignedContainer(container.getId());
+ synchronized (subQueryMap) {
+ subQueryMap.remove(container.getPriority());
+ }
+ } else {
+ if (allocated.containsKey(subQueryId)) {
+ allocated.get(subQueryId).add(container);
+ } else {
+ allocated.put(subQueryId, Lists.newArrayList(container));
+ }
+ }
+ }
- if (ask.size() > 0 || release.size() > 0) {
- LOG.info("getResources() for " + applicationId + ":" + " ask="
- + ask.size() + " release= " + release.size() + " newContainers="
- + response.getAllocatedContainers().size() + " finishedContainers="
- + response.getCompletedContainersStatuses().size()
- + " resourcelimit=" + availableResources + " knownNMs="
- + clusterNmCount);
+ for (Entry<SubQueryId, List<Container>> entry : allocated.entrySet()) {
+ eventHandler.handle(new SubQueryContainerAllocationEvent(entry.getKey(), entry.getValue()));
+ }
}
-
- ask.clear();
- release.clear();
- return response;
- }
-
- public Resource getAvailableResources() {
- return availableResources;
}
+ @Override
+ public void handle(ContainerAllocationEvent event) {
- long retrystartTime;
- long retryInterval = 3000;
- private List<Container> getResources() throws Exception {
- int headRoom = getAvailableResources() != null ? getAvailableResources().getMemory() : 0;//first time it would be null
- AMResponse response;
-
- /*
- * If contact with RM is lost, the AM will wait MR_AM_TO_RM_WAIT_INTERVAL_MS
- * milliseconds before aborting. During this interval, AM will still try
- * to contact the RM.
- */
- try {
- response = makeRemoteRequest();
- // Reset retry count if no exception occurred.
- retrystartTime = System.currentTimeMillis();
- } catch (Exception e) {
- // This can happen when the connection to the RM has gone down. Keep
- // re-trying until the retryInterval has expired.
- if (System.currentTimeMillis() - retrystartTime >= retryInterval) {
- LOG.error("Could not contact RM after " + retryInterval + " milliseconds.");
- eventHandler.handle(new QueryEvent(query.getId(),
- QueryEventType.INTERNAL_ERROR));
- throw new YarnException("Could not contact RM after " +
- retryInterval + " milliseconds.");
- }
- // Throw this up to the caller, which may decide to ignore it and
- // continue to attempt to contact the RM.
- throw e;
- }
+ if (event.getType() == ContainerAllocatorEventType.CONTAINER_REQ) {
+ LOG.info(event);
+ subQueryMap.put(event.getPriority(), event.getSubQueryId());
+ addContainerRequest(new ContainerRequest(event.getCapability(), null, null,
+ event.getPriority(), event.getRequiredNum()));
- if (response.getReboot()) {
- // This can happen if the RM has been restarted. If it is in that state,
- // this application must clean itself up.
- eventHandler.handle(new QueryEvent(query.getId(),
- QueryEventType.INTERNAL_ERROR));
- throw new YarnException("Resource Manager doesn't recognize AttemptId: " +
- context.getApplicationId());
+ } else if (event.getType() == ContainerAllocatorEventType.CONTAINER_DEALLOCATE) {
+ LOG.info(event);
+ } else {
+ LOG.info(event);
}
-
- int newHeadRoom = getAvailableResources() != null ? getAvailableResources().getMemory() : 0;
- List<Container> newContainers = response.getAllocatedContainers();
-
- return newContainers;
}
}