You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2014/04/22 08:06:45 UTC
[48/50] [abbrv] git commit: Workaround for YARN-314 by not mixing new
container request with remove container request (request with container count
= 0).
Workaround for YARN-314 by not mixing new container request with remove container request (request with container count = 0).
Signed-off-by: Terence Yim <ch...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/2c3cf396
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/2c3cf396
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/2c3cf396
Branch: refs/heads/site
Commit: 2c3cf39682415420a89f0360c0abe9a42fdc4abe
Parents: d6504eb
Author: Terence Yim <te...@continuuity.com>
Authored: Wed Apr 16 17:06:25 2014 -0700
Committer: Terence Yim <ch...@apache.org>
Committed: Thu Apr 17 00:26:25 2014 -0700
----------------------------------------------------------------------
.../internal/yarn/Hadoop20YarnAMClient.java | 139 ++++-------
.../internal/yarn/Hadoop21YarnAMClient.java | 100 ++------
.../appmaster/ApplicationMasterService.java | 6 +-
.../internal/yarn/AbstractYarnAMClient.java | 229 +++++++++++++++++++
.../twill/internal/yarn/YarnAMClient.java | 4 +-
.../apache/twill/yarn/ContainerSizeTestRun.java | 119 ++++++++++
.../apache/twill/yarn/LogHandlerTestRun.java | 3 +
7 files changed, 420 insertions(+), 180 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/2c3cf396/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAMClient.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAMClient.java b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAMClient.java
index 68d073d..9b66f67 100644
--- a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAMClient.java
+++ b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAMClient.java
@@ -19,23 +19,18 @@ package org.apache.twill.internal.yarn;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
-import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
-import com.google.common.collect.Multimap;
-import com.google.common.util.concurrent.AbstractIdleService;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.twill.internal.ProcessLauncher;
import org.apache.twill.internal.appmaster.RunnableProcessLauncher;
import org.apache.twill.internal.yarn.ports.AMRMClient;
import org.apache.twill.internal.yarn.ports.AMRMClientImpl;
@@ -43,15 +38,13 @@ import org.apache.twill.internal.yarn.ports.AllocationResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.net.InetSocketAddress;
-import java.net.URL;
import java.util.List;
-import java.util.UUID;
+import javax.annotation.Nullable;
/**
*
*/
-public final class Hadoop20YarnAMClient extends AbstractIdleService implements YarnAMClient {
+public final class Hadoop20YarnAMClient extends AbstractYarnAMClient<AMRMClient.ContainerRequest> {
private static final Logger LOG = LoggerFactory.getLogger(Hadoop20YarnAMClient.class);
private static final Function<ContainerStatus, YarnContainerStatus> STATUS_TRANSFORM;
@@ -65,21 +58,13 @@ public final class Hadoop20YarnAMClient extends AbstractIdleService implements Y
};
}
- private final ContainerId containerId;
- private final Multimap<String, AMRMClient.ContainerRequest> containerRequests;
private final AMRMClient amrmClient;
private final YarnNMClient nmClient;
- private InetSocketAddress trackerAddr;
- private URL trackerUrl;
private Resource maxCapability;
private Resource minCapability;
public Hadoop20YarnAMClient(Configuration conf) {
- String masterContainerId = System.getenv().get(ApplicationConstants.AM_CONTAINER_ID_ENV);
- Preconditions.checkArgument(masterContainerId != null,
- "Missing %s from environment", ApplicationConstants.AM_CONTAINER_ID_ENV);
- this.containerId = ConverterUtils.toContainerId(masterContainerId);
- this.containerRequests = ArrayListMultimap.create();
+ super(ApplicationConstants.AM_CONTAINER_ID_ENV);
this.amrmClient = new AMRMClientImpl(containerId.getApplicationAttemptId());
this.amrmClient.init(conf);
@@ -110,107 +95,67 @@ public final class Hadoop20YarnAMClient extends AbstractIdleService implements Y
}
@Override
- public ContainerId getContainerId() {
- return containerId;
- }
-
- @Override
public String getHost() {
return System.getenv().get(ApplicationConstants.NM_HOST_ENV);
}
@Override
- public void setTracker(InetSocketAddress trackerAddr, URL trackerUrl) {
- this.trackerAddr = trackerAddr;
- this.trackerUrl = trackerUrl;
- }
-
- @Override
- public synchronized void allocate(float progress, AllocateHandler handler) throws Exception {
- AllocationResponse response = amrmClient.allocate(progress);
- List<ProcessLauncher<YarnContainerInfo>> launchers
- = Lists.newArrayListWithCapacity(response.getAllocatedContainers().size());
-
- for (Container container : response.getAllocatedContainers()) {
- launchers.add(new RunnableProcessLauncher(new Hadoop20YarnContainerInfo(container), nmClient));
+ protected Resource adjustCapability(Resource resource) {
+ int cores = YarnUtils.getVirtualCores(resource);
+ int updatedCores = Math.max(Math.min(cores, YarnUtils.getVirtualCores(maxCapability)),
+ YarnUtils.getVirtualCores(minCapability));
+ // Try and set the virtual cores, which older versions of YARN don't support this.
+ if (cores != updatedCores && YarnUtils.setVirtualCores(resource, updatedCores)) {
+ LOG.info("Adjust virtual cores requirement from {} to {}.", cores, updatedCores);
}
- if (!launchers.isEmpty()) {
- handler.acquired(launchers);
-
- // If no process has been launched through the given launcher, return the container.
- for (ProcessLauncher<YarnContainerInfo> l : launchers) {
- // This cast always works.
- RunnableProcessLauncher launcher = (RunnableProcessLauncher) l;
- if (!launcher.isLaunched()) {
- Container container = launcher.getContainerInfo().getContainer();
- LOG.info("Nothing to run in container, releasing it: {}", container);
- amrmClient.releaseAssignedContainer(container.getId());
- }
- }
- }
+ int updatedMemory = Math.min(resource.getMemory(), maxCapability.getMemory());
+ int minMemory = minCapability.getMemory();
+ updatedMemory = (int) Math.ceil(((double) updatedMemory / minMemory)) * minMemory;
- List<YarnContainerStatus> completed = ImmutableList.copyOf(
- Iterables.transform(response.getCompletedContainersStatuses(), STATUS_TRANSFORM));
- if (!completed.isEmpty()) {
- handler.completed(completed);
+ if (resource.getMemory() != updatedMemory) {
+ resource.setMemory(updatedMemory);
+ LOG.info("Adjust memory requirement from {} to {} MB.", resource.getMemory(), updatedMemory);
}
+
+ return resource;
}
@Override
- public ContainerRequestBuilder addContainerRequest(Resource capability) {
- return addContainerRequest(capability, 1);
+ protected AMRMClient.ContainerRequest createContainerRequest(Priority priority, Resource capability,
+ @Nullable String[] hosts, @Nullable String[] racks) {
+ return new AMRMClient.ContainerRequest(capability, hosts, racks, priority, 1);
}
@Override
- public ContainerRequestBuilder addContainerRequest(Resource capability, int count) {
- return new ContainerRequestBuilder(adjustCapability(capability), count) {
- @Override
- public String apply() {
- synchronized (Hadoop20YarnAMClient.this) {
- String id = UUID.randomUUID().toString();
-
- String[] hosts = this.hosts.isEmpty() ? null : this.hosts.toArray(new String[this.hosts.size()]);
- String[] racks = this.racks.isEmpty() ? null : this.racks.toArray(new String[this.racks.size()]);
-
- for (int i = 0; i < count; i++) {
- AMRMClient.ContainerRequest request = new AMRMClient.ContainerRequest(capability, hosts, racks,
- priority, 1);
- containerRequests.put(id, request);
- amrmClient.addContainerRequest(request);
- }
-
- return id;
- }
- }
- };
+ protected void addContainerRequest(AMRMClient.ContainerRequest request) {
+ amrmClient.addContainerRequest(request);
}
@Override
- public synchronized void completeContainerRequest(String id) {
- for (AMRMClient.ContainerRequest request : containerRequests.removeAll(id)) {
- amrmClient.removeContainerRequest(request);
- }
+ protected void removeContainerRequest(AMRMClient.ContainerRequest request) {
+ amrmClient.removeContainerRequest(request);
}
- private Resource adjustCapability(Resource resource) {
- int cores = YarnUtils.getVirtualCores(resource);
- int updatedCores = Math.max(Math.min(cores, YarnUtils.getVirtualCores(maxCapability)),
- YarnUtils.getVirtualCores(minCapability));
- // Try and set the virtual cores, which older versions of YARN don't support this.
- if (cores != updatedCores && YarnUtils.setVirtualCores(resource, updatedCores)) {
- LOG.info("Adjust virtual cores requirement from {} to {}.", cores, updatedCores);
+ @Override
+ protected AllocateResult doAllocate(float progress) throws Exception {
+ AllocationResponse response = amrmClient.allocate(progress);
+ List<RunnableProcessLauncher> launchers
+ = Lists.newArrayListWithCapacity(response.getAllocatedContainers().size());
+
+ for (Container container : response.getAllocatedContainers()) {
+ launchers.add(new RunnableProcessLauncher(new Hadoop20YarnContainerInfo(container), nmClient));
}
- int updatedMemory = Math.min(resource.getMemory(), maxCapability.getMemory());
- int minMemory = minCapability.getMemory();
- updatedMemory = (int) Math.ceil(((double) updatedMemory / minMemory)) * minMemory;
+ List<YarnContainerStatus> completed = ImmutableList.copyOf(
+ Iterables.transform(response.getCompletedContainersStatuses(), STATUS_TRANSFORM));
- if (resource.getMemory() != updatedMemory) {
- resource.setMemory(updatedMemory);
- LOG.info("Adjust memory requirement from {} to {} MB.", resource.getMemory(), updatedMemory);
- }
+ return new AllocateResult(launchers, completed);
+ }
- return resource;
+ @Override
+ protected void releaseAssignedContainer(YarnContainerInfo containerInfo) {
+ Container container = containerInfo.getContainer();
+ amrmClient.releaseAssignedContainer(container.getId());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/2c3cf396/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAMClient.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAMClient.java b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAMClient.java
index 0ebc0f5..78a5135 100644
--- a/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAMClient.java
+++ b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAMClient.java
@@ -19,37 +19,30 @@ package org.apache.twill.internal.yarn;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
-import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
-import com.google.common.collect.Multimap;
-import com.google.common.util.concurrent.AbstractIdleService;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.api.AMRMClient;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.twill.internal.ProcessLauncher;
import org.apache.twill.internal.appmaster.RunnableProcessLauncher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.net.InetSocketAddress;
-import java.net.URL;
import java.util.List;
-import java.util.UUID;
+import javax.annotation.Nullable;
/**
*
*/
-public final class Hadoop21YarnAMClient extends AbstractIdleService implements YarnAMClient {
+public final class Hadoop21YarnAMClient extends AbstractYarnAMClient<AMRMClient.ContainerRequest> {
private static final Logger LOG = LoggerFactory.getLogger(Hadoop21YarnAMClient.class);
@@ -64,20 +57,12 @@ public final class Hadoop21YarnAMClient extends AbstractIdleService implements Y
};
}
- private final ContainerId containerId;
- private final Multimap<String, AMRMClient.ContainerRequest> containerRequests;
private final AMRMClient<AMRMClient.ContainerRequest> amrmClient;
private final Hadoop21YarnNMClient nmClient;
- private InetSocketAddress trackerAddr;
- private URL trackerUrl;
private Resource maxCapability;
public Hadoop21YarnAMClient(Configuration conf) {
- String masterContainerId = System.getenv().get(ApplicationConstants.Environment.CONTAINER_ID.name());
- Preconditions.checkArgument(masterContainerId != null,
- "Missing %s from environment", ApplicationConstants.Environment.CONTAINER_ID.name());
- this.containerId = ConverterUtils.toContainerId(masterContainerId);
- this.containerRequests = ArrayListMultimap.create();
+ super(ApplicationConstants.Environment.CONTAINER_ID.name());
this.amrmClient = AMRMClient.createAMRMClient();
this.amrmClient.init(conf);
@@ -105,89 +90,50 @@ public final class Hadoop21YarnAMClient extends AbstractIdleService implements Y
}
@Override
- public ContainerId getContainerId() {
- return containerId;
+ public String getHost() {
+ return System.getenv().get(ApplicationConstants.Environment.NM_HOST.name());
}
@Override
- public String getHost() {
- return System.getenv().get(ApplicationConstants.Environment.NM_HOST.name());
+ protected AMRMClient.ContainerRequest createContainerRequest(Priority priority, Resource capability,
+ @Nullable String[] hosts, @Nullable String[] racks) {
+ return new AMRMClient.ContainerRequest(capability, hosts, racks, priority);
}
@Override
- public void setTracker(InetSocketAddress trackerAddr, URL trackerUrl) {
- this.trackerAddr = trackerAddr;
- this.trackerUrl = trackerUrl;
+ protected void addContainerRequest(AMRMClient.ContainerRequest request) {
+ amrmClient.addContainerRequest(request);
}
@Override
- public synchronized void allocate(float progress, AllocateHandler handler) throws Exception {
+ protected void removeContainerRequest(AMRMClient.ContainerRequest request) {
+ amrmClient.removeContainerRequest(request);
+ }
+
+ @Override
+ protected AllocateResult doAllocate(float progress) throws Exception {
AllocateResponse allocateResponse = amrmClient.allocate(progress);
- List<ProcessLauncher<YarnContainerInfo>> launchers
+ List<RunnableProcessLauncher> launchers
= Lists.newArrayListWithCapacity(allocateResponse.getAllocatedContainers().size());
for (Container container : allocateResponse.getAllocatedContainers()) {
launchers.add(new RunnableProcessLauncher(new Hadoop21YarnContainerInfo(container), nmClient));
}
- if (!launchers.isEmpty()) {
- handler.acquired(launchers);
-
- // If no process has been launched through the given launcher, return the container.
- for (ProcessLauncher<YarnContainerInfo> l : launchers) {
- // This cast always works.
- RunnableProcessLauncher launcher = (RunnableProcessLauncher) l;
- if (!launcher.isLaunched()) {
- Container container = launcher.getContainerInfo().getContainer();
- LOG.info("Nothing to run in container, releasing it: {}", container);
- amrmClient.releaseAssignedContainer(container.getId());
- }
- }
- }
-
List<YarnContainerStatus> completed = ImmutableList.copyOf(
Iterables.transform(allocateResponse.getCompletedContainersStatuses(), STATUS_TRANSFORM));
- if (!completed.isEmpty()) {
- handler.completed(completed);
- }
- }
- @Override
- public ContainerRequestBuilder addContainerRequest(Resource capability) {
- return addContainerRequest(capability, 1);
+ return new AllocateResult(launchers, completed);
}
@Override
- public ContainerRequestBuilder addContainerRequest(Resource capability, int count) {
- return new ContainerRequestBuilder(adjustCapability(capability), count) {
- @Override
- public String apply() {
- synchronized (Hadoop21YarnAMClient.this) {
- String id = UUID.randomUUID().toString();
-
- String[] hosts = this.hosts.isEmpty() ? null : this.hosts.toArray(new String[this.hosts.size()]);
- String[] racks = this.racks.isEmpty() ? null : this.racks.toArray(new String[this.racks.size()]);
-
- for (int i = 0; i < count; i++) {
- AMRMClient.ContainerRequest request = new AMRMClient.ContainerRequest(capability, hosts, racks, priority);
- containerRequests.put(id, request);
- amrmClient.addContainerRequest(request);
- }
-
- return id;
- }
- }
- };
+ protected void releaseAssignedContainer(YarnContainerInfo containerInfo) {
+ Container container = containerInfo.getContainer();
+ amrmClient.releaseAssignedContainer(container.getId());
}
@Override
- public synchronized void completeContainerRequest(String id) {
- for (AMRMClient.ContainerRequest request : containerRequests.removeAll(id)) {
- amrmClient.removeContainerRequest(request);
- }
- }
-
- private Resource adjustCapability(Resource resource) {
+ protected Resource adjustCapability(Resource resource) {
int cores = resource.getVirtualCores();
int updatedCores = Math.min(resource.getVirtualCores(), maxCapability.getVirtualCores());
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/2c3cf396/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
index f1ad20e..2b7f049 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
@@ -291,7 +291,7 @@ public final class ApplicationMasterService extends AbstractTwillService {
final Set<String> ids = Sets.newHashSet(runningContainers.getContainerIds());
YarnAMClient.AllocateHandler handler = new YarnAMClient.AllocateHandler() {
@Override
- public void acquired(List<ProcessLauncher<YarnContainerInfo>> launchers) {
+ public void acquired(List<? extends ProcessLauncher<YarnContainerInfo>> launchers) {
// no-op
}
@@ -352,7 +352,7 @@ public final class ApplicationMasterService extends AbstractTwillService {
YarnAMClient.AllocateHandler allocateHandler = new YarnAMClient.AllocateHandler() {
@Override
- public void acquired(List<ProcessLauncher<YarnContainerInfo>> launchers) {
+ public void acquired(List<? extends ProcessLauncher<YarnContainerInfo>> launchers) {
launchRunnable(launchers, provisioning);
}
@@ -525,7 +525,7 @@ public final class ApplicationMasterService extends AbstractTwillService {
/**
* Launches runnables in the provisioned containers.
*/
- private void launchRunnable(List<ProcessLauncher<YarnContainerInfo>> launchers,
+ private void launchRunnable(List<? extends ProcessLauncher<YarnContainerInfo>> launchers,
Queue<ProvisionRequest> provisioning) {
for (ProcessLauncher<YarnContainerInfo> processLauncher : launchers) {
LOG.info("Got container {}", processLauncher.getContainerInfo().getId());
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/2c3cf396/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnAMClient.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnAMClient.java b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnAMClient.java
new file mode 100644
index 0000000..9718150
--- /dev/null
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnAMClient.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
+import com.google.common.util.concurrent.AbstractIdleService;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.twill.internal.ProcessLauncher;
+import org.apache.twill.internal.appmaster.RunnableProcessLauncher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.util.List;
+import java.util.UUID;
+import javax.annotation.Nullable;
+
+/**
+ * Abstract base for implementing YarnAMClient for different versions of hadoop.
+ *
+ * @param <T> Type of container request.
+ */
+public abstract class AbstractYarnAMClient<T> extends AbstractIdleService implements YarnAMClient {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractYarnAMClient.class);
+
+ // Map from a unique ID to inflight requests
+ private final Multimap<String, T> containerRequests;
+
+ // List of requests pending to send through allocate call
+ private final List<T> requests;
+ // List of requests pending to remove through allocate call
+ private final List<T> removes;
+
+ protected final ContainerId containerId;
+ protected InetSocketAddress trackerAddr;
+ protected URL trackerUrl;
+
+ /**
+ * Constructs an instance of AMClient.
+ *
+ * @param containerIdEnvName Name of the environment variable that contains value of the AM container ID.
+ */
+ protected AbstractYarnAMClient(String containerIdEnvName) {
+ String masterContainerId = System.getenv().get(containerIdEnvName);
+ Preconditions.checkArgument(masterContainerId != null,
+ "Missing %s from environment", containerIdEnvName);
+ this.containerId = ConverterUtils.toContainerId(masterContainerId);
+ this.containerRequests = ArrayListMultimap.create();
+ this.requests = Lists.newLinkedList();
+ this.removes = Lists.newLinkedList();
+ }
+
+
+ @Override
+ public final ContainerId getContainerId() {
+ return containerId;
+ }
+
+ @Override
+ public final void setTracker(InetSocketAddress trackerAddr, URL trackerUrl) {
+ this.trackerAddr = trackerAddr;
+ this.trackerUrl = trackerUrl;
+ }
+
+ @Override
+ public final synchronized void allocate(float progress, AllocateHandler handler) throws Exception {
+ // In one allocate cycle, either only do new container request or removal of requests.
+ // This is a workaround for YARN-314.
+ // When remove a container request, AMRMClient will send a container request with size = 0
+ // With bug YARN-314, if we mix the allocate call with new container request of the same priority,
+ // in some cases the RM would not see the new request (based on sorting of resource capability),
+ // but rather only see the one with size = 0.
+ if (removes.isEmpty()) {
+ for (T request : requests) {
+ addContainerRequest(request);
+ }
+ requests.clear();
+ } else {
+ for (T request : removes) {
+ removeContainerRequest(request);
+ }
+ removes.clear();
+ }
+
+ AllocateResult allocateResponse = doAllocate(progress);
+ List<RunnableProcessLauncher> launchers = allocateResponse.getLaunchers();
+
+ if (!launchers.isEmpty()) {
+ handler.acquired(launchers);
+
+ // If no process has been launched through the given launcher, return the container.
+ for (ProcessLauncher<YarnContainerInfo> l : launchers) {
+ // This cast always works.
+ RunnableProcessLauncher launcher = (RunnableProcessLauncher) l;
+ if (!launcher.isLaunched()) {
+ YarnContainerInfo containerInfo = launcher.getContainerInfo();
+ LOG.info("Nothing to run in container, releasing it: {}", containerInfo.getContainer());
+ releaseAssignedContainer(containerInfo);
+ }
+ }
+ }
+
+ List<YarnContainerStatus> completed = allocateResponse.getCompletedStatus();
+ if (!completed.isEmpty()) {
+ handler.completed(completed);
+ }
+ }
+
+ @Override
+ public final ContainerRequestBuilder addContainerRequest(Resource capability) {
+ return addContainerRequest(capability, 1);
+ }
+
+ @Override
+ public final ContainerRequestBuilder addContainerRequest(Resource capability, int count) {
+ return new ContainerRequestBuilder(adjustCapability(capability), count) {
+ @Override
+ public String apply() {
+ synchronized (AbstractYarnAMClient.this) {
+ String id = UUID.randomUUID().toString();
+
+ String[] hosts = this.hosts.isEmpty() ? null : this.hosts.toArray(new String[this.hosts.size()]);
+ String[] racks = this.racks.isEmpty() ? null : this.racks.toArray(new String[this.racks.size()]);
+
+ for (int i = 0; i < count; i++) {
+ T request = createContainerRequest(priority, capability, hosts, racks);
+ containerRequests.put(id, request);
+ requests.add(request);
+ }
+
+ return id;
+ }
+ }
+ };
+
+ }
+
+ @Override
+ public final synchronized void completeContainerRequest(String id) {
+ for (T request : containerRequests.removeAll(id)) {
+ removes.add(request);
+ }
+ }
+
+ /**
+ * Adjusts the given resource capability to fit in the cluster limit.
+ *
+ * @param capability The capability to be adjusted.
+ * @return A {@link Resource} instance representing the adjusted result.
+ */
+ protected abstract Resource adjustCapability(Resource capability);
+
+ /**
+ * Creates a container request based on the given requirement.
+ *
+ * @param priority The priority of the request.
+ * @param capability The resource capability.
+ * @param hosts Sets of hosts. Could be {@code null}.
+ * @param racks Sets of racks. Could be {@code null}.
+ * @return A container request.
+ */
+ protected abstract T createContainerRequest(Priority priority, Resource capability,
+ @Nullable String[] hosts, @Nullable String[] racks);
+
+ /**
+ * Adds the given request to prepare for next allocate call.
+ */
+ protected abstract void addContainerRequest(T request);
+
+ /**
+ * Removes the given request to prepare for the next allocate call.
+ */
+ protected abstract void removeContainerRequest(T request);
+
+ /**
+ * Performs actual allocate call to RM.
+ */
+ protected abstract AllocateResult doAllocate(float progress) throws Exception;
+
+ /**
+ * Releases the given container back to RM.
+ */
+ protected abstract void releaseAssignedContainer(YarnContainerInfo containerInfo);
+
+ /**
+ * Class for carrying results for the {@link #doAllocate(float)} call.
+ */
+ protected static final class AllocateResult {
+ private final List<RunnableProcessLauncher> launchers;
+ private final List<YarnContainerStatus> completedStatus;
+
+ public AllocateResult(List<RunnableProcessLauncher> launchers, List<YarnContainerStatus> completedStatus) {
+ this.launchers = ImmutableList.copyOf(launchers);
+ this.completedStatus = ImmutableList.copyOf(completedStatus);
+ }
+
+ public List<RunnableProcessLauncher> getLaunchers() {
+ return launchers;
+ }
+
+ public List<YarnContainerStatus> getCompletedStatus() {
+ return completedStatus;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/2c3cf396/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAMClient.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAMClient.java b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAMClient.java
index 370ca3c..6a5ee36 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAMClient.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAMClient.java
@@ -19,7 +19,6 @@ package org.apache.twill.internal.yarn;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Service;
-import org.apache.commons.lang.ArrayUtils;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
@@ -29,7 +28,6 @@ import org.apache.twill.internal.ProcessLauncher;
import java.net.InetSocketAddress;
import java.net.URL;
import java.util.Collection;
-import java.util.Collections;
import java.util.List;
import java.util.Set;
@@ -95,7 +93,7 @@ public interface YarnAMClient extends Service {
*/
// TODO: Move AM heartbeat logic into this interface so AM only needs to handle callback.
interface AllocateHandler {
- void acquired(List<ProcessLauncher<YarnContainerInfo>> launchers);
+ void acquired(List<? extends ProcessLauncher<YarnContainerInfo>> launchers);
void completed(List<YarnContainerStatus> completed);
}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/2c3cf396/twill-yarn/src/test/java/org/apache/twill/yarn/ContainerSizeTestRun.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/ContainerSizeTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/ContainerSizeTestRun.java
new file mode 100644
index 0000000..42d332d
--- /dev/null
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/ContainerSizeTestRun.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.twill.yarn;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.twill.api.AbstractTwillRunnable;
+import org.apache.twill.api.ResourceSpecification;
+import org.apache.twill.api.TwillApplication;
+import org.apache.twill.api.TwillController;
+import org.apache.twill.api.TwillRunner;
+import org.apache.twill.api.TwillSpecification;
+import org.apache.twill.api.logging.PrinterLogHandler;
+import org.apache.twill.discovery.ServiceDiscovered;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.PrintWriter;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Test for requesting different container size in different order.
+ * It specifically test for workaround for YARN-314.
+ */
+public class ContainerSizeTestRun extends BaseYarnTest {
+
+ @Test
+ public void testContainerSize() throws InterruptedException {
+ TwillRunner runner = YarnTestUtils.getTwillRunner();
+ TwillController controller = runner.prepare(new SleepApp())
+ .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
+ .start();
+
+ try {
+ ServiceDiscovered discovered = controller.discoverService("sleep");
+ Assert.assertTrue(YarnTestUtils.waitForSize(discovered, 2, 60));
+ } finally {
+ controller.stopAndWait();
+ }
+ }
+
+
+ /**
+ * An application that has two runnables with different memory size.
+ */
+ public static final class SleepApp implements TwillApplication {
+
+ @Override
+ public TwillSpecification configure() {
+ ResourceSpecification largeRes = ResourceSpecification.Builder.with()
+ .setVirtualCores(1)
+ .setMemory(1024, ResourceSpecification.SizeUnit.MEGA)
+ .build();
+
+ ResourceSpecification smallRes = ResourceSpecification.Builder.with()
+ .setVirtualCores(1)
+ .setMemory(512, ResourceSpecification.SizeUnit.MEGA)
+ .build();
+
+ return TwillSpecification.Builder.with()
+ .setName("SleepApp")
+ .withRunnable()
+ .add("sleep1", new SleepRunnable(12345), largeRes).noLocalFiles()
+ .add("sleep2", new SleepRunnable(12346), smallRes).noLocalFiles()
+ .withOrder()
+ .begin("sleep1")
+ .nextWhenStarted("sleep2")
+ .build();
+ }
+ }
+
+
+ /**
+ * A runnable that sleep for 30 seconds.
+ */
+ public static final class SleepRunnable extends AbstractTwillRunnable {
+
+ private volatile Thread runThread;
+
+ public SleepRunnable(int port) {
+ super(ImmutableMap.of("port", Integer.toString(port)));
+ }
+
+ @Override
+ public void run() {
+ runThread = Thread.currentThread();
+ Random random = new Random();
+ getContext().announce("sleep", Integer.parseInt(getContext().getSpecification().getConfigs().get("port")));
+ try {
+ TimeUnit.SECONDS.sleep(30);
+ } catch (InterruptedException e) {
+ // Ignore.
+ }
+ }
+
+ @Override
+ public void stop() {
+ if (runThread != null) {
+ runThread.interrupt();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/2c3cf396/twill-yarn/src/test/java/org/apache/twill/yarn/LogHandlerTestRun.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/LogHandlerTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/LogHandlerTestRun.java
index 4d45ad1..f5ead6b 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/LogHandlerTestRun.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/LogHandlerTestRun.java
@@ -23,12 +23,14 @@ import org.apache.twill.api.TwillRunner;
import org.apache.twill.api.logging.LogEntry;
import org.apache.twill.api.logging.LogHandler;
import org.apache.twill.api.logging.LogThrowable;
+import org.apache.twill.api.logging.PrinterLogHandler;
import org.apache.twill.common.Services;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.PrintWriter;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
@@ -62,6 +64,7 @@ public class LogHandlerTestRun extends BaseYarnTest {
TwillRunner runner = YarnTestUtils.getTwillRunner();
TwillController controller = runner.prepare(new LogRunnable())
+ .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
.addLogHandler(logHandler)
.start();