You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2014/10/08 02:04:34 UTC
[1/6] TWILL-87 : Adding Container Placement Policy control.
Repository: incubator-twill
Updated Branches:
refs/heads/site 0004452d8 -> 128a16f35
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnAMClient.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnAMClient.java b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnAMClient.java
index 9718150..cb6a58e 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnAMClient.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/AbstractYarnAMClient.java
@@ -22,6 +22,7 @@ import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
import com.google.common.util.concurrent.AbstractIdleService;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Priority;
@@ -35,6 +36,7 @@ import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
import java.net.URL;
import java.util.List;
+import java.util.Set;
import java.util.UUID;
import javax.annotation.Nullable;
@@ -54,6 +56,16 @@ public abstract class AbstractYarnAMClient<T> extends AbstractIdleService implem
private final List<T> requests;
// List of requests pending to remove through allocate call
private final List<T> removes;
+ //List of pending blacklist additions for the next allocate call
+ private final List<String> blacklistAdditions;
+ //List of pending blacklist removals for the next allocate call
+ private final List<String> blacklistRemovals;
+ //Keep track of blacklisted resources
+ private final List<String> blacklistedResources;
+ /**
+ * Contains a list of known unsupported features.
+ */
+ protected final Set<String> unsupportedFeatures = Sets.newHashSet();
protected final ContainerId containerId;
protected InetSocketAddress trackerAddr;
@@ -72,6 +84,9 @@ public abstract class AbstractYarnAMClient<T> extends AbstractIdleService implem
this.containerRequests = ArrayListMultimap.create();
this.requests = Lists.newLinkedList();
this.removes = Lists.newLinkedList();
+ this.blacklistAdditions = Lists.newArrayList();
+ this.blacklistRemovals = Lists.newArrayList();
+ this.blacklistedResources = Lists.newArrayList();
}
@@ -106,6 +121,12 @@ public abstract class AbstractYarnAMClient<T> extends AbstractIdleService implem
removes.clear();
}
+ if (!blacklistAdditions.isEmpty() || !blacklistRemovals.isEmpty()) {
+ updateBlacklist(blacklistAdditions, blacklistRemovals);
+ blacklistAdditions.clear();
+ blacklistRemovals.clear();
+ }
+
AllocateResult allocateResponse = doAllocate(progress);
List<RunnableProcessLauncher> launchers = allocateResponse.getLaunchers();
@@ -147,7 +168,7 @@ public abstract class AbstractYarnAMClient<T> extends AbstractIdleService implem
String[] racks = this.racks.isEmpty() ? null : this.racks.toArray(new String[this.racks.size()]);
for (int i = 0; i < count; i++) {
- T request = createContainerRequest(priority, capability, hosts, racks);
+ T request = createContainerRequest(priority, capability, hosts, racks, relaxLocality);
containerRequests.put(id, request);
requests.add(request);
}
@@ -160,6 +181,31 @@ public abstract class AbstractYarnAMClient<T> extends AbstractIdleService implem
}
@Override
+ public final void addToBlacklist(String resource) {
+ if (!blacklistAdditions.contains(resource) && !blacklistedResources.contains(resource)) {
+ blacklistAdditions.add(resource);
+ blacklistedResources.add(resource);
+ blacklistRemovals.remove(resource);
+ }
+ }
+
+ @Override
+ public final void removeFromBlacklist(String resource) {
+ if (!blacklistRemovals.contains(resource) && blacklistedResources.contains(resource)) {
+ blacklistRemovals.add(resource);
+ blacklistedResources.remove(resource);
+ blacklistAdditions.remove(resource);
+ }
+ }
+
+ @Override
+ public final void clearBlacklist() {
+ blacklistRemovals.addAll(blacklistedResources);
+ blacklistedResources.clear();
+ blacklistAdditions.clear();
+ }
+
+ @Override
public final synchronized void completeContainerRequest(String id) {
for (T request : containerRequests.removeAll(id)) {
removes.add(request);
@@ -167,6 +213,19 @@ public abstract class AbstractYarnAMClient<T> extends AbstractIdleService implem
}
/**
+ * Records an unsupported feature.
+ * @param unsupportedFeature A string identifying an unsupported feature.
+ * @return Returns {@code false} if the feature has already been recorded, {@code true} otherwise.
+ */
+ protected boolean recordUnsupportedFeature(String unsupportedFeature) {
+ if (unsupportedFeatures.contains(unsupportedFeature)) {
+ return false;
+ }
+ unsupportedFeatures.add(unsupportedFeature);
+ return true;
+ }
+
+ /**
* Adjusts the given resource capability to fit in the cluster limit.
*
* @param capability The capability to be adjusted.
@@ -181,10 +240,12 @@ public abstract class AbstractYarnAMClient<T> extends AbstractIdleService implem
* @param capability The resource capability.
* @param hosts Sets of hosts. Could be {@code null}.
* @param racks Sets of racks. Could be {@code null}.
+ * @param relaxLocality If set {@code false}, locality constraints will not be relaxed.
* @return A container request.
*/
protected abstract T createContainerRequest(Priority priority, Resource capability,
- @Nullable String[] hosts, @Nullable String[] racks);
+ @Nullable String[] hosts, @Nullable String[] racks,
+ boolean relaxLocality);
/**
* Adds the given request to prepare for next allocate call.
@@ -197,6 +258,11 @@ public abstract class AbstractYarnAMClient<T> extends AbstractIdleService implem
protected abstract void removeContainerRequest(T request);
/**
+ * Send blacklist updates in the next allocate call.
+ */
+ protected abstract void updateBlacklist(List<String> blacklistAdditions, List<String> blacklistRemovals);
+
+ /**
* Performs actual allocate call to RM.
*/
protected abstract AllocateResult doAllocate(float progress) throws Exception;
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAMClientFactory.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAMClientFactory.java b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAMClientFactory.java
index 6f47b6c..d932e77 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAMClientFactory.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAMClientFactory.java
@@ -36,14 +36,23 @@ public final class VersionDetectYarnAMClientFactory implements YarnAMClientFacto
public YarnAMClient create() {
try {
Class<YarnAMClient> clz;
- if (YarnUtils.isHadoop20()) {
- // Uses hadoop-2.0 class
- String clzName = getClass().getPackage().getName() + ".Hadoop20YarnAMClient";
- clz = (Class<YarnAMClient>) Class.forName(clzName);
- } else {
- // Uses hadoop-2.1 class
- String clzName = getClass().getPackage().getName() + ".Hadoop21YarnAMClient";
- clz = (Class<YarnAMClient>) Class.forName(clzName);
+ String clzName;
+ switch (YarnUtils.getHadoopVersion()) {
+ case HADOOP_20:
+ // Uses hadoop-2.0 class
+ clzName = getClass().getPackage().getName() + ".Hadoop20YarnAMClient";
+ clz = (Class<YarnAMClient>) Class.forName(clzName);
+ break;
+ case HADOOP_21:
+ // Uses hadoop-2.1 class
+ clzName = getClass().getPackage().getName() + ".Hadoop21YarnAMClient";
+ clz = (Class<YarnAMClient>) Class.forName(clzName);
+ break;
+ default:
+ // Uses hadoop-2.2 class
+ clzName = getClass().getPackage().getName() + ".Hadoop22YarnAMClient";
+ clz = (Class<YarnAMClient>) Class.forName(clzName);
+ break;
}
return clz.getConstructor(Configuration.class).newInstance(conf);
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAppClientFactory.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAppClientFactory.java b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAppClientFactory.java
index f9db959..04bd30d 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAppClientFactory.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/VersionDetectYarnAppClientFactory.java
@@ -31,7 +31,7 @@ public final class VersionDetectYarnAppClientFactory implements YarnAppClientFac
try {
Class<YarnAppClient> clz;
- if (YarnUtils.isHadoop20()) {
+ if (YarnUtils.getHadoopVersion().equals(YarnUtils.HadoopVersions.HADOOP_20)) {
// Uses hadoop-2.0 class.
String clzName = getClass().getPackage().getName() + ".Hadoop20YarnAppClient";
clz = (Class<YarnAppClient>) Class.forName(clzName);
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAMClient.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAMClient.java b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAMClient.java
index a5a061a..a10181e 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAMClient.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAMClient.java
@@ -47,10 +47,12 @@ public interface YarnAMClient extends Service {
protected final Set<String> hosts = Sets.newHashSet();
protected final Set<String> racks = Sets.newHashSet();
protected final Priority priority = Records.newRecord(Priority.class);
+ protected boolean relaxLocality;
protected ContainerRequestBuilder(Resource capability, int count) {
this.capability = capability;
this.count = count;
+ this.relaxLocality = true;
}
public ContainerRequestBuilder addHosts(Collection<String> newHosts) {
@@ -66,6 +68,11 @@ public interface YarnAMClient extends Service {
return this;
}
+ public ContainerRequestBuilder setRelaxLocality(boolean relaxLocality) {
+ this.relaxLocality = relaxLocality;
+ return this;
+ }
+
/**
* Adds a container request. Returns an unique ID for the request.
*/
@@ -83,6 +90,8 @@ public interface YarnAMClient extends Service {
String getHost();
+ int getNMPort();
+
/**
* Sets the tracker address and tracker url. This method should be called before calling {@link #start()}.
*/
@@ -104,6 +113,12 @@ public interface YarnAMClient extends Service {
ContainerRequestBuilder addContainerRequest(Resource capability, int count);
+ void addToBlacklist(String resource);
+
+ void removeFromBlacklist(String resource);
+
+ void clearBlacklist();
+
/**
* Notify a container request is fulfilled.
*
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAppClient.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAppClient.java b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAppClient.java
index 97236f6..3b03a2a 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAppClient.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAppClient.java
@@ -19,10 +19,13 @@ package org.apache.twill.internal.yarn;
import com.google.common.util.concurrent.Service;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.twill.api.TwillSpecification;
import org.apache.twill.internal.ProcessController;
import org.apache.twill.internal.ProcessLauncher;
+import java.util.List;
+
/**
* Interface for launching Yarn application from client.
*/
@@ -42,4 +45,11 @@ public interface YarnAppClient extends Service {
ProcessLauncher<ApplicationId> createLauncher(String user, TwillSpecification twillSpec) throws Exception;
ProcessController<YarnApplicationReport> createProcessController(ApplicationId appId);
+
+ /**
+ * Returns a list of {@link org.apache.hadoop.yarn.api.records.NodeReport} for the nodes in the cluster.
+ * @return a list of {@link org.apache.hadoop.yarn.api.records.NodeReport} for the nodes in the cluster.
+ * @throws Exception Propagates exceptions thrown by {@link org.apache.hadoop.yarn.client.api.YarnClient}.
+ */
+ List<NodeReport> getNodeReports() throws Exception;
}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnUtils.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnUtils.java b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnUtils.java
index be30f33..1c65591 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnUtils.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnUtils.java
@@ -21,6 +21,7 @@ import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -57,8 +58,17 @@ import java.util.concurrent.atomic.AtomicReference;
*/
public class YarnUtils {
+ /**
+ * Defines different versions of Hadoop.
+ */
+ public enum HadoopVersions {
+ HADOOP_20,
+ HADOOP_21,
+ HADOOP_22
+ }
+
private static final Logger LOG = LoggerFactory.getLogger(YarnUtils.class);
- private static final AtomicReference<Boolean> HADOOP_20 = new AtomicReference<Boolean>();
+ private static final AtomicReference<HadoopVersions> HADOOP_VERSION = new AtomicReference<HadoopVersions>();
public static YarnLocalResource createLocalResource(LocalFile localFile) {
Preconditions.checkArgument(localFile.getLastModified() >= 0, "Last modified time should be >= 0.");
@@ -202,21 +212,27 @@ public class YarnUtils {
}
/**
- * Returns true if Hadoop-2.0 classes are in the classpath.
+ * Returns {@link org.apache.twill.internal.yarn.YarnUtils.HadoopVersions} for the current build profile,
+ * depending on the classes in the classpath.
+ * @return The version of Hadoop for the current build profile.
*/
- public static boolean isHadoop20() {
- Boolean hadoop20 = HADOOP_20.get();
- if (hadoop20 != null) {
- return hadoop20;
+ public static HadoopVersions getHadoopVersion() {
+ HadoopVersions hadoopVersion = HADOOP_VERSION.get();
+ if (hadoopVersion != null) {
+ return hadoopVersion;
}
try {
Class.forName("org.apache.hadoop.yarn.client.api.NMClient");
- HADOOP_20.set(false);
- return false;
+ try {
+ Class.forName("org.apache.hadoop.yarn.client.cli.LogsCLI");
+ HADOOP_VERSION.set(HadoopVersions.HADOOP_22);
+ } catch (ClassNotFoundException e) {
+ HADOOP_VERSION.set(HadoopVersions.HADOOP_21);
+ }
} catch (ClassNotFoundException e) {
- HADOOP_20.set(true);
- return true;
+ HADOOP_VERSION.set(HadoopVersions.HADOOP_20);
}
+ return HADOOP_VERSION.get();
}
/**
@@ -225,7 +241,7 @@ public class YarnUtils {
private static <T> T createAdapter(Class<T> clz) {
String className = clz.getPackage().getName();
- if (isHadoop20()) {
+ if (getHadoopVersion().equals(HadoopVersions.HADOOP_20)) {
className += ".Hadoop20" + clz.getSimpleName();
} else {
className += ".Hadoop21" + clz.getSimpleName();
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
index 36b4f0b..0e0fc75 100644
--- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
+++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillPreparer.java
@@ -455,8 +455,8 @@ final class YarnTwillPreparer implements TwillPreparer {
}
TwillSpecificationAdapter.create().toJson(
- new DefaultTwillSpecification(spec.getName(), runtimeSpec, spec.getOrders(), eventHandler),
- writer);
+ new DefaultTwillSpecification(spec.getName(), runtimeSpec, spec.getOrders(), spec.getPlacementPolicies(),
+ eventHandler), writer);
} finally {
writer.close();
}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-yarn/src/test/java/org/apache/twill/yarn/PlacementPolicyTestRun.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/PlacementPolicyTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/PlacementPolicyTestRun.java
new file mode 100644
index 0000000..8981b66
--- /dev/null
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/PlacementPolicyTestRun.java
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.yarn;
+
+import com.google.common.collect.Sets;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.twill.api.Hosts;
+import org.apache.twill.api.Racks;
+import org.apache.twill.api.ResourceReport;
+import org.apache.twill.api.ResourceSpecification;
+import org.apache.twill.api.TwillApplication;
+import org.apache.twill.api.TwillController;
+import org.apache.twill.api.TwillRunResources;
+import org.apache.twill.api.TwillRunner;
+import org.apache.twill.api.TwillSpecification;
+import org.apache.twill.api.logging.PrinterLogHandler;
+import org.apache.twill.discovery.ServiceDiscovered;
+import org.apache.twill.internal.yarn.YarnUtils;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.PrintWriter;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Tests for placement Policies.
+ */
+public class PlacementPolicyTestRun extends BaseYarnTest {
+ private static final int RUNNABLE_MEMORY = 512;
+ private static final int RUNNABLE_CORES = 1;
+
+ private static List<NodeReport> nodeReports;
+ private static ResourceSpecification resource;
+ private static ResourceSpecification twoInstancesResource;
+
+ /**
+ * Verify the cluster configuration (number and capability of node managers) required for the tests.
+ */
+ @BeforeClass
+ public static void verifyClusterCapability() {
+ // Ignore verifications if it is running against older Hadoop versions which does not support blacklists.
+ Assume.assumeTrue(YarnUtils.getHadoopVersion().equals(YarnUtils.HadoopVersions.HADOOP_22));
+
+ // All runnables in this test class use same resource specification for the sake of convenience.
+ resource = ResourceSpecification.Builder.with()
+ .setVirtualCores(RUNNABLE_CORES)
+ .setMemory(RUNNABLE_MEMORY, ResourceSpecification.SizeUnit.MEGA)
+ .build();
+ twoInstancesResource = ResourceSpecification.Builder.with()
+ .setVirtualCores(RUNNABLE_CORES)
+ .setMemory(RUNNABLE_MEMORY, ResourceSpecification.SizeUnit.MEGA)
+ .setInstances(2)
+ .build();
+
+ try {
+ nodeReports = YarnTestUtils.getNodeReports();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ // The tests need exactly three NodeManagers in the cluster.
+ Assert.assertNotNull(nodeReports);
+ Assert.assertEquals(nodeReports.size(), 3);
+
+ // All NodeManagers should have enough capacity available to accommodate at least two runnables.
+ for (NodeReport nodeReport : nodeReports) {
+ Resource capability = nodeReport.getCapability();
+ Resource used = nodeReport.getUsed();
+ Assert.assertNotNull(capability);
+ if (used != null) {
+ Assert.assertTrue(2 * resource.getMemorySize() < capability.getMemory() - used.getMemory());
+ } else {
+ Assert.assertTrue(2 * resource.getMemorySize() < capability.getMemory());
+ }
+ }
+ }
+
+ /**
+ * Test to verify placement policy without dynamically changing number of instances.
+ */
+ @Test
+ public void testPlacementPolicy() throws Exception {
+ // Ignore test if it is running against older Hadoop versions which does not support blacklists.
+ Assume.assumeTrue(YarnUtils.getHadoopVersion().equals(YarnUtils.HadoopVersions.HADOOP_22));
+
+ ServiceDiscovered serviceDiscovered;
+ ResourceReport resourceReport;
+ Set<Integer> nmPorts = Sets.newHashSet();
+ Collection<TwillRunResources> distributedResource;
+
+ Assert.assertEquals(getProvisionedNodeManagerCount(), 0);
+ TwillRunner runner = YarnTestUtils.getTwillRunner();
+ TwillController controller = runner.prepare(new PlacementPolicyApplication())
+ .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
+ .withApplicationArguments("PlacementPolicyTest")
+ .withArguments("hostRunnable", "host")
+ .withArguments("hostRackRunnable", "hostRack")
+ .withArguments("distributedRunnable", "distributed")
+ .start();
+
+ try {
+ // All runnables should get started.
+ serviceDiscovered = controller.discoverService("PlacementPolicyTest");
+ Assert.assertTrue(YarnTestUtils.waitForSize(serviceDiscovered, 4, 80));
+
+ // DISTRIBUTED runnables should be provisioned on different nodes.
+ Assert.assertTrue(getProvisionedNodeManagerCount() >= 2);
+ } finally {
+ controller.stopAndWait();
+ }
+
+ // Sleep a bit before exiting.
+ TimeUnit.SECONDS.sleep(2);
+ }
+
+ /**
+ * An application that specify runnables with different placement policies.
+ */
+ public static final class PlacementPolicyApplication implements TwillApplication {
+
+ @Override
+ public TwillSpecification configure() {
+ return TwillSpecification.Builder.with()
+ .setName("PlacementPolicyApplication")
+ .withRunnable()
+ .add("hostRunnable", new EchoServer(), resource).noLocalFiles()
+ .add("hostRackRunnable", new EchoServer(), resource).noLocalFiles()
+ .add("distributedRunnable", new EchoServer(), twoInstancesResource).noLocalFiles()
+ .withPlacementPolicy()
+ .add(Hosts.of(nodeReports.get(0).getHttpAddress()), "hostRunnable")
+ .add(Hosts.of(nodeReports.get(1).getHttpAddress()), Racks.of("/default-rack"), "hostRackRunnable")
+ .add(TwillSpecification.PlacementPolicy.Type.DISTRIBUTED, "distributedRunnable")
+ .anyOrder()
+ .build();
+ }
+ }
+
+ /**
+ * Test to verify DISTRIBUTED placement policies are taken care of when number of instances are changed.
+ * Also, verifies that DISTRIBUTED placement policies do not affect other runnables.
+ */
+ @Test
+ public void testDistributedPlacementPolicy() throws Exception {
+ // Ignore test if it is running against older Hadoop versions which does not support blacklists.
+ Assume.assumeTrue(YarnUtils.getHadoopVersion().equals(YarnUtils.HadoopVersions.HADOOP_22));
+
+ ServiceDiscovered serviceDiscovered;
+ ResourceReport resourceReport;
+ Set<Integer> nmPorts = Sets.newHashSet();
+ Collection<TwillRunResources> aliceResources;
+ Collection<TwillRunResources> bobResources;
+
+ Assert.assertEquals(getProvisionedNodeManagerCount(), 0);
+ TwillRunner runner = YarnTestUtils.getTwillRunner();
+ TwillController controller = runner.prepare(new DistributedApplication())
+ .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
+ .withApplicationArguments("DistributedTest")
+ .withArguments("Alice", "alice")
+ .withArguments("Bob", "bob")
+ .withArguments("Eve", "eve")
+ .start();
+
+ try {
+ // All runnables should get started with DISTRIBUTED ones being on different nodes.
+ serviceDiscovered = controller.discoverService("DistributedTest");
+ Assert.assertTrue(YarnTestUtils.waitForSize(serviceDiscovered, 3, 60));
+ Assert.assertTrue(getProvisionedNodeManagerCount() >= 2);
+
+ // Spawning a new instance for DISTRIBUTED runnable Alice, which should get a different node.
+ controller.changeInstances("Alice", 2);
+ serviceDiscovered = controller.discoverService("DistributedTest");
+ Assert.assertTrue(YarnTestUtils.waitForSize(serviceDiscovered, 4, 60));
+ Assert.assertTrue(getProvisionedNodeManagerCount() >= 3);
+
+ // Spawning a new instance for DEFAULT runnable Eve,
+ // which should not be affected by placement policies of previous runnables.
+ controller.changeInstances("Eve", 2);
+ serviceDiscovered = controller.discoverService("DistributedTest");
+ Assert.assertTrue(YarnTestUtils.waitForSize(serviceDiscovered, 5, 60));
+
+ // Spawning a new instance for DISTRIBUTED runnable Bob,
+ // which will be forced to give up it's placement policy restrictions, since there are only three nodes.
+ controller.changeInstances("Bob", 2);
+ serviceDiscovered = controller.discoverService("DistributedTest");
+ Assert.assertTrue(YarnTestUtils.waitForSize(serviceDiscovered, 6, 60));
+ Assert.assertTrue(getProvisionedNodeManagerCount() >= 3);
+ } finally {
+ controller.stopAndWait();
+ }
+
+ // Sleep a bit before exiting.
+ TimeUnit.SECONDS.sleep(2);
+ }
+
+ /**
+ * An application that runs three runnables, with a DISTRIBUTED placement policy for two of them.
+ */
+ public static final class DistributedApplication implements TwillApplication {
+
+ @Override
+ public TwillSpecification configure() {
+ return TwillSpecification.Builder.with()
+ .setName("DistributedApplication")
+ .withRunnable()
+ .add("Alice", new EchoServer(), resource).noLocalFiles()
+ .add("Bob", new EchoServer(), resource).noLocalFiles()
+ .add("Eve", new EchoServer(), resource).noLocalFiles()
+ .withPlacementPolicy()
+ .add(TwillSpecification.PlacementPolicy.Type.DISTRIBUTED, "Alice", "Bob")
+ .anyOrder()
+ .build();
+ }
+ }
+
+ /**
+ * Test to verify changing instances during application run works for DISTRIBUTED runnables.
+ */
+ @Test
+ public void testChangeInstance() throws InterruptedException {
+ // Ignore test if it is running against older Hadoop versions which does not support blacklists.
+ Assume.assumeTrue(YarnUtils.getHadoopVersion().equals(YarnUtils.HadoopVersions.HADOOP_22));
+
+ ServiceDiscovered serviceDiscovered;
+ ResourceReport resourceReport;
+ Set<Integer> nmPorts = Sets.newHashSet();
+ Collection<TwillRunResources> aliceResources;
+ Collection<TwillRunResources> bobResources;
+
+ TwillRunner runner = YarnTestUtils.getTwillRunner();
+ TwillController controller = runner.prepare(new ChangeInstanceApplication())
+ .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
+ .withApplicationArguments("DistributedTest")
+ .withArguments("Alice", "alice")
+ .withArguments("Bob", "bob")
+ .withArguments("Eve", "eve")
+ .start();
+
+ try {
+ // All runnables should get started.
+ serviceDiscovered = controller.discoverService("DistributedTest");
+ Assert.assertTrue(YarnTestUtils.waitForSize(serviceDiscovered, 4, 60));
+
+ // Increasing the instance count for runnable Alice by 2.
+ controller.changeInstances("Alice", 4);
+ serviceDiscovered = controller.discoverService("DistributedTest");
+ Assert.assertTrue(YarnTestUtils.waitForSize(serviceDiscovered, 6, 60));
+
+ // Decreasing instance count for runnable Alice by 3.
+ controller.changeInstances("Alice", 1);
+ serviceDiscovered = controller.discoverService("DistributedTest");
+ Assert.assertTrue(YarnTestUtils.waitForSize(serviceDiscovered, 3, 60));
+
+ // Increasing instance count for runnable Bob by 2.
+ controller.changeInstances("Bob", 3);
+ serviceDiscovered = controller.discoverService("DistributedTest");
+ Assert.assertTrue(YarnTestUtils.waitForSize(serviceDiscovered, 5, 60));
+
+ // Increasing instance count for runnable Eve by 2.
+ controller.changeInstances("Eve", 3);
+ serviceDiscovered = controller.discoverService("DistributedTest");
+ Assert.assertTrue(YarnTestUtils.waitForSize(serviceDiscovered, 7, 60));
+ } finally {
+ controller.stopAndWait();
+ }
+
+ // Sleep a bit before exiting.
+ TimeUnit.SECONDS.sleep(2);
+ }
+
+ /**
+ * An application that runs three runnables, with a DISTRIBUTED placement policy for two of them.
+ */
+ public static final class ChangeInstanceApplication implements TwillApplication {
+
+ @Override
+ public TwillSpecification configure() {
+ return TwillSpecification.Builder.with()
+ .setName("DistributedApplication")
+ .withRunnable()
+ .add("Alice", new EchoServer(), twoInstancesResource).noLocalFiles()
+ .add("Bob", new EchoServer(), resource).noLocalFiles()
+ .add("Eve", new EchoServer(), resource).noLocalFiles()
+ .withPlacementPolicy()
+ .add(TwillSpecification.PlacementPolicy.Type.DISTRIBUTED, "Alice", "Bob")
+ .anyOrder()
+ .build();
+ }
+ }
+
+ /**
+ * Test to verify exception is thrown in case a non-existent runnable is specified in a placement policy.
+ */
+ @Test(expected = IllegalArgumentException.class)
+ public void testNonExistentRunnable() {
+ TwillRunner runner = YarnTestUtils.getTwillRunner();
+ TwillController controller = runner.prepare(new FaultyApplication())
+ .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
+ .start();
+ controller.stopAndWait();
+ }
+
+ /**
+ * An application that uses non-existent runnable name while specifying placement policies.
+ */
+ public static final class FaultyApplication implements TwillApplication {
+
+ @Override
+ public TwillSpecification configure() {
+ return TwillSpecification.Builder.with()
+ .setName("FaultyApplication")
+ .withRunnable()
+ .add("Hermione", new EchoServer(), resource).noLocalFiles()
+ .add("Harry", new EchoServer(), resource).noLocalFiles()
+ .add("Ron", new EchoServer(), resource).noLocalFiles()
+ .withPlacementPolicy()
+ .add(TwillSpecification.PlacementPolicy.Type.DEFAULT, "Hermione", "Ron")
+ .add(TwillSpecification.PlacementPolicy.Type.DISTRIBUTED, "Draco", "Harry")
+ .anyOrder()
+ .build();
+ }
+ }
+
+ /**
+ * Test to verify exception is thrown in case a runnable is mentioned in more than one placement policy.
+ */
+ @Test(expected = IllegalArgumentException.class)
+ public void testPlacementPolicySpecification() {
+ TwillRunner runner = YarnTestUtils.getTwillRunner();
+ TwillController controller = runner.prepare(new BadApplication())
+ .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
+ .start();
+ controller.stopAndWait();
+ }
+
+ /**
+ * An application that specifies a runnable name in more than one placement policy.
+ */
+ public static final class BadApplication implements TwillApplication {
+
+ @Override
+ public TwillSpecification configure() {
+ return TwillSpecification.Builder.with()
+ .setName("BadApplication")
+ .withRunnable()
+ .add("Hermione", new EchoServer(), resource).noLocalFiles()
+ .add("Harry", new EchoServer(), resource).noLocalFiles()
+ .add("Ron", new EchoServer(), resource).noLocalFiles()
+ .withPlacementPolicy()
+ .add(TwillSpecification.PlacementPolicy.Type.DEFAULT, "Hermione", "Harry")
+ .add(TwillSpecification.PlacementPolicy.Type.DISTRIBUTED, "Hermione", "Ron")
+ .anyOrder()
+ .build();
+ }
+ }
+
+ /**
+ * Helper function to verify DISTRIBUTED placement policies.
+ * Returns the number of NodeManagers on which runnables got provisioned.
+ * @return number of NodeManagers on which runnables got provisioned.
+ */
+ private static int getProvisionedNodeManagerCount() throws Exception {
+ int provisionedNodeManagerCount = 0;
+ for (NodeReport nodeReport : YarnTestUtils.getNodeReports()) {
+ Resource used = nodeReport.getUsed();
+ if (used != null && used.getMemory() > 0) {
+ provisionedNodeManagerCount++;
+ }
+ }
+ return provisionedNodeManagerCount;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java b/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java
index b8a3915..87c380f 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestSuite.java
@@ -35,7 +35,8 @@ import org.junit.runners.Suite;
LogHandlerTestRun.class,
SessionExpireTestRun.class,
ServiceDiscoveryTestRun.class,
- DebugTestRun.class
+ DebugTestRun.class,
+ PlacementPolicyTestRun.class
})
public final class YarnTestSuite {
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestUtils.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestUtils.java b/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestUtils.java
index bdf97e6..310d097 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestUtils.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/YarnTestUtils.java
@@ -21,10 +21,13 @@ import com.google.common.collect.Iterables;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.twill.api.TwillRunner;
import org.apache.twill.api.TwillRunnerService;
+import org.apache.twill.internal.yarn.VersionDetectYarnAppClientFactory;
+import org.apache.twill.internal.yarn.YarnAppClient;
import org.apache.twill.internal.yarn.YarnUtils;
import org.apache.twill.internal.zookeeper.InMemoryZKServer;
import org.junit.rules.TemporaryFolder;
@@ -33,6 +36,7 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
+import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -47,6 +51,7 @@ public final class YarnTestUtils {
private static MiniYARNCluster cluster;
private static TwillRunnerService runnerService;
private static YarnConfiguration config;
+ private static YarnAppClient yarnAppClient;
private static final AtomicBoolean once = new AtomicBoolean(false);
@@ -85,7 +90,7 @@ public final class YarnTestUtils {
Configuration conf = new YarnConfiguration(dfsCluster.getFileSystem().getConf());
- if (YarnUtils.isHadoop20()) {
+ if (YarnUtils.getHadoopVersion().equals(YarnUtils.HadoopVersions.HADOOP_20)) {
conf.set("yarn.resourcemanager.scheduler.class",
"org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler");
} else {
@@ -93,13 +98,14 @@ public final class YarnTestUtils {
"org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler");
conf.set("yarn.scheduler.capacity.resource-calculator",
"org.apache.hadoop.yarn.util.resource.DominantResourceCalculator");
+ conf.setBoolean("yarn.scheduler.include-port-in-node-name", true);
}
conf.set("yarn.nodemanager.vmem-pmem-ratio", "20.1");
conf.set("yarn.nodemanager.vmem-check-enabled", "false");
conf.set("yarn.scheduler.minimum-allocation-mb", "128");
conf.set("yarn.nodemanager.delete.debug-delay-sec", "3600");
- cluster = new MiniYARNCluster("test-cluster", 1, 1, 1);
+ cluster = new MiniYARNCluster("test-cluster", 3, 1, 1);
cluster.init(conf);
cluster.start();
@@ -107,6 +113,9 @@ public final class YarnTestUtils {
runnerService = createTwillRunnerService();
runnerService.startAndWait();
+
+ yarnAppClient = new VersionDetectYarnAppClientFactory().create(conf);
+ yarnAppClient.start();
}
public static final boolean finish() {
@@ -115,6 +124,7 @@ public final class YarnTestUtils {
cluster.stop();
dfsCluster.shutdown();
zkServer.stopAndWait();
+ yarnAppClient.stop();
return true;
}
@@ -140,6 +150,15 @@ public final class YarnTestUtils {
return runner;
}
+ /**
+ * Returns {@link org.apache.hadoop.yarn.api.records.NodeReport} for the nodes in the MiniYarnCluster.
+ * @return a list of {@link org.apache.hadoop.yarn.api.records.NodeReport} for the nodes in the cluster.
+ * @throws Exception Propagates exceptions thrown by {@link org.apache.hadoop.yarn.client.api.YarnClient}.
+ */
+ public static final List<NodeReport> getNodeReports() throws Exception {
+ return yarnAppClient.getNodeReports();
+ }
+
public static final <T> boolean waitForSize(Iterable<T> iterable, int count, int limit) throws InterruptedException {
int trial = 0;
int size = Iterables.size(iterable);
[2/6] git commit: TWILL-87 : Adding Container Placement Policy
control.
Posted by ch...@apache.org.
TWILL-87 : Adding Container Placement Policy control.
Signed-off-by: Terence Yim <ch...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/8cf4cd02
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/8cf4cd02
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/8cf4cd02
Branch: refs/heads/site
Commit: 8cf4cd02c922595c703eff4076ca74633421b301
Parents: 34af22b
Author: Gourav Khaneja <go...@gmail.com>
Authored: Sat Aug 2 00:25:02 2014 -0700
Committer: Terence Yim <ch...@apache.org>
Committed: Sat Aug 2 23:04:56 2014 -0700
----------------------------------------------------------------------
pom.xml | 2 +
.../main/java/org/apache/twill/api/Hosts.java | 65 +++
.../main/java/org/apache/twill/api/Racks.java | 65 +++
.../apache/twill/api/ResourceSpecification.java | 58 +--
.../apache/twill/api/TwillSpecification.java | 175 ++++++++-
.../internal/DefaultResourceSpecification.java | 25 --
.../internal/DefaultTwillSpecification.java | 86 +++-
.../org/apache/twill/internal/Constants.java | 6 +
.../json/ResourceSpecificationCodec.java | 6 +-
.../json/TwillSpecificationAdapter.java | 3 +
.../internal/json/TwillSpecificationCodec.java | 36 +-
.../json/ResourceSpecificationCodecTest.java | 19 +-
.../internal/yarn/Hadoop20YarnAMClient.java | 17 +-
.../internal/yarn/Hadoop20YarnAppClient.java | 7 +
.../internal/yarn/Hadoop21YarnAMClient.java | 28 +-
.../internal/yarn/Hadoop21YarnAppClient.java | 8 +
.../internal/yarn/Hadoop22YarnAMClient.java | 42 ++
.../appmaster/AllocationSpecification.java | 105 +++++
.../appmaster/ApplicationMasterService.java | 169 +++++++-
.../appmaster/PlacementPolicyManager.java | 104 +++++
.../internal/appmaster/ProvisionRequest.java | 11 +
.../appmaster/RunnableContainerRequest.java | 11 +-
.../appmaster/RunnableProcessLauncher.java | 5 +-
.../internal/appmaster/RunningContainers.java | 54 +++
.../internal/yarn/AbstractYarnAMClient.java | 70 +++-
.../yarn/VersionDetectYarnAMClientFactory.java | 25 +-
.../yarn/VersionDetectYarnAppClientFactory.java | 2 +-
.../twill/internal/yarn/YarnAMClient.java | 15 +
.../twill/internal/yarn/YarnAppClient.java | 10 +
.../apache/twill/internal/yarn/YarnUtils.java | 38 +-
.../apache/twill/yarn/YarnTwillPreparer.java | 4 +-
.../twill/yarn/PlacementPolicyTestRun.java | 391 +++++++++++++++++++
.../org/apache/twill/yarn/YarnTestSuite.java | 3 +-
.../org/apache/twill/yarn/YarnTestUtils.java | 23 +-
34 files changed, 1522 insertions(+), 166 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index dfb7394..67ab4fc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -508,6 +508,7 @@
<configuration>
<sources>
<source>src/main/hadoop21</source>
+ <source>src/main/hadoop22</source>
</sources>
</configuration>
</execution>
@@ -552,6 +553,7 @@
<configuration>
<sources>
<source>src/main/hadoop21</source>
+ <source>src/main/hadoop22</source>
</sources>
</configuration>
</execution>
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-api/src/main/java/org/apache/twill/api/Hosts.java
----------------------------------------------------------------------
diff --git a/twill-api/src/main/java/org/apache/twill/api/Hosts.java b/twill-api/src/main/java/org/apache/twill/api/Hosts.java
new file mode 100644
index 0000000..90a27ad
--- /dev/null
+++ b/twill-api/src/main/java/org/apache/twill/api/Hosts.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.api;
+
+import com.google.common.collect.ImmutableSet;
+
+import java.util.Arrays;
+import java.util.Set;
+
+/**
+ * Represents a list of hosts.
+ */
+
+public class Hosts {
+ private final Set<String> hosts;
+
+ public Hosts(Set<String> hosts) {
+ this.hosts = ImmutableSet.copyOf(hosts);
+ }
+
+ public Hosts(String host, String...moreHosts) {
+ this.hosts = ImmutableSet.<String>builder()
+ .add(host)
+ .addAll(Arrays.asList(moreHosts))
+ .build();
+ }
+
+ /**
+ * Convenience method to create an instance of {@link org.apache.twill.api.Hosts}.
+ * @param host A host to be added.
+ * @param moreHosts A list of hosts to be added.
+ * @return An instance of {@link org.apache.twill.api.Hosts} containing specified hosts.
+ */
+ public static Hosts of(String host, String...moreHosts) {
+ return new Hosts(host, moreHosts);
+ }
+
+ /**
+ * Get the list of hosts.
+ * @return list of hosts.
+ */
+ public Set<String> get() {
+ return this.hosts;
+ }
+
+ @Override
+ public String toString() {
+ return this.hosts.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-api/src/main/java/org/apache/twill/api/Racks.java
----------------------------------------------------------------------
diff --git a/twill-api/src/main/java/org/apache/twill/api/Racks.java b/twill-api/src/main/java/org/apache/twill/api/Racks.java
new file mode 100644
index 0000000..04eedd0
--- /dev/null
+++ b/twill-api/src/main/java/org/apache/twill/api/Racks.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.api;
+
+import com.google.common.collect.ImmutableSet;
+
+import java.util.Arrays;
+import java.util.Set;
+
+/**
+ * Represents a list of Racks.
+ */
+
+public class Racks {
+ private final Set<String> racks;
+
+ public Racks(Set<String> racks) {
+ this.racks = ImmutableSet.copyOf(racks);
+ }
+
+ public Racks(String rack, String...moreRacks) {
+ this.racks = ImmutableSet.<String>builder()
+ .add(rack)
+ .addAll(Arrays.asList(moreRacks))
+ .build();
+ }
+
+ /**
+ * Convenience method to create an instance of {@link org.apache.twill.api.Racks}.
+ * @param rack A rack to be added.
+ * @param moreRacks A list of racks to be added.
+ * @return An instance of {@link org.apache.twill.api.Racks} containing specified racks.
+ */
+ public static Racks of(String rack, String...moreRacks) {
+ return new Racks(rack, moreRacks);
+ }
+
+ /**
+ * Get the list of racks.
+ * @return list of racks.
+ */
+ public Set<String> get() {
+ return this.racks;
+ }
+
+ @Override
+ public String toString() {
+ return this.racks.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-api/src/main/java/org/apache/twill/api/ResourceSpecification.java
----------------------------------------------------------------------
diff --git a/twill-api/src/main/java/org/apache/twill/api/ResourceSpecification.java b/twill-api/src/main/java/org/apache/twill/api/ResourceSpecification.java
index 4b602bc..2865caf 100644
--- a/twill-api/src/main/java/org/apache/twill/api/ResourceSpecification.java
+++ b/twill-api/src/main/java/org/apache/twill/api/ResourceSpecification.java
@@ -17,14 +17,8 @@
*/
package org.apache.twill.api;
-import com.google.common.collect.Iterables;
import org.apache.twill.internal.DefaultResourceSpecification;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.LinkedList;
-import java.util.List;
-
/**
* This interface provides specifications for resource requirements including set and get methods
* for number of cores, amount of memory, and number of instances.
@@ -85,20 +79,6 @@ public interface ResourceSpecification {
int getInstances();
/**
- * Returns the execution hosts, expects Fully Qualified Domain Names host + domain.
- * This is a suggestion for the scheduler depending on cluster load it may ignore it
- * @return An array containing the hosts where the containers should run
- */
- List<String> getHosts();
-
- /**
- * Returns the execution racks.
- * This is a suggestion for the scheduler depending on cluster load it may ignore it
- * @return An array containing the racks where the containers should run
- */
- List<String> getRacks();
-
- /**
* Builder for creating {@link ResourceSpecification}.
*/
static final class Builder {
@@ -108,8 +88,6 @@ public interface ResourceSpecification {
private int uplink = -1;
private int downlink = -1;
private int instances = 1;
- private List<String> hosts = new LinkedList<String>();
- private List<String> racks = new LinkedList<String>();
public static CoreSetter with() {
return new Builder().new CoreSetter();
@@ -150,42 +128,10 @@ public interface ResourceSpecification {
}
public final class AfterUplink extends Build {
- public AfterDownlink setDownlink(int downlink, SizeUnit unit) {
+ public Done setDownlink(int downlink, SizeUnit unit) {
Builder.this.downlink = downlink * unit.multiplier;
- return new AfterDownlink();
- }
- }
-
- public final class AfterHosts extends Build {
- public Done setRacks(String... racks) {
- if (racks != null) {
- Builder.this.racks = Arrays.asList(racks);
- }
return new Done();
}
-
- public Done setRacks(Iterable<String> racks) {
- if (racks != null) {
- Iterables.addAll(Builder.this.racks, racks);
- }
- return new Done();
- }
- }
-
- public final class AfterDownlink extends Build {
- public AfterHosts setHosts(String... hosts) {
- if (hosts != null) {
- Builder.this.hosts = Arrays.asList(hosts);
- }
- return new AfterHosts();
- }
-
- public AfterHosts setHosts(Iterable<String> hosts) {
- if (hosts != null) {
- Iterables.addAll(Builder.this.hosts, hosts);
- }
- return new AfterHosts();
- }
}
public final class Done extends Build {
@@ -193,7 +139,7 @@ public interface ResourceSpecification {
public abstract class Build {
public ResourceSpecification build() {
- return new DefaultResourceSpecification(cores, memory, instances, uplink, downlink, hosts, racks);
+ return new DefaultResourceSpecification(cores, memory, instances, uplink, downlink);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-api/src/main/java/org/apache/twill/api/TwillSpecification.java
----------------------------------------------------------------------
diff --git a/twill-api/src/main/java/org/apache/twill/api/TwillSpecification.java b/twill-api/src/main/java/org/apache/twill/api/TwillSpecification.java
index 3931d04..ed37190 100644
--- a/twill-api/src/main/java/org/apache/twill/api/TwillSpecification.java
+++ b/twill-api/src/main/java/org/apache/twill/api/TwillSpecification.java
@@ -60,6 +60,46 @@ public interface TwillSpecification {
}
/**
+ * Defines a container placement policy.
+ */
+ interface PlacementPolicy {
+
+ /**
+ * Lists different types of Placement Policies available.
+ */
+ enum Type {
+ /**
+ * Runnables should be scattered over different hosts.
+ */
+ DISTRIBUTED,
+ /**
+ * No specific placement policy.
+ */
+ DEFAULT
+ }
+
+ /**
+ * @return Set of {@link org.apache.twill.api.TwillRunnable} names that belongs to this placement policy.
+ */
+ Set<String> getNames();
+
+ /**
+ * @return {@link org.apache.twill.api.TwillSpecification.PlacementPolicy.Type Type} of this placement policy.
+ */
+ Type getType();
+
+ /**
+ * @return Set of hosts associated with this placement policy.
+ */
+ Set<String> getHosts();
+
+ /**
+ * @return Set of racks associated with this placement policy.
+ */
+ Set<String> getRacks();
+ }
+
+ /**
* @return Name of the application.
*/
String getName();
@@ -75,6 +115,11 @@ public interface TwillSpecification {
List<Order> getOrders();
/**
+ * @return Returns all {@link org.apache.twill.api.TwillSpecification.PlacementPolicy} for this application.
+ */
+ List<PlacementPolicy> getPlacementPolicies();
+
+ /**
* @return The {@link EventHandlerSpecification} for the {@link EventHandler} to be used for this application,
* or {@code null} if no event handler has been provided.
*/
@@ -89,6 +134,7 @@ public interface TwillSpecification {
private String name;
private Map<String, RuntimeSpecification> runnables = Maps.newHashMap();
private List<Order> orders = Lists.newArrayList();
+ private List<PlacementPolicy> placementPolicies = Lists.newArrayList();
private EventHandlerSpecification eventHandler;
public static NameSetter with() {
@@ -128,6 +174,8 @@ public interface TwillSpecification {
FirstOrder withOrder();
AfterOrder anyOrder();
+
+ PlacementPolicySetter withPlacementPolicy();
}
public final class RunnableSetter implements MoreRunnable, AfterRunnable {
@@ -170,6 +218,11 @@ public interface TwillSpecification {
public AfterOrder anyOrder() {
return new OrderSetter();
}
+
+ @Override
+ public PlacementPolicySetter withPlacementPolicy() {
+ return new PlacementPolicySetter();
+ }
}
/**
@@ -252,6 +305,125 @@ public interface TwillSpecification {
}
}
+ /**
+ * Interface to add placement policies to the application.
+ */
+ public interface MorePlacementPolicies {
+
+ /**
+ * Specify hosts for a list of runnables.
+ * @param hosts {@link org.apache.twill.api.Hosts} specifying a set of hosts.
+ * @param runnableName a runnable name.
+ * @param runnableNames a list of runnable names.
+ * @return A reference to either add more placement policies or skip to defining execution order.
+ */
+ PlacementPolicySetter add(Hosts hosts, String runnableName, String... runnableNames);
+
+ /**
+ * Specify racks for a list of runnables.
+ * @param racks {@link org.apache.twill.api.Racks} specifying a set of racks.
+ * @param runnableName a runnable name.
+ * @param runnableNames a list of runnable names.
+ * @return A reference to either add more placement policies or skip to defining execution order.
+ */
+ PlacementPolicySetter add(Racks racks, String runnableName, String... runnableNames);
+
+ /**
+ * Specify hosts and racks for a list of runnables.
+ * @param hosts {@link org.apache.twill.api.Hosts} specifying a set of hosts.
+ * @param racks {@link org.apache.twill.api.Racks} specifying a set of racks.
+ * @param runnableName a runnable name.
+ * @param runnableNames a list of runnable names.
+ * @return A reference to either add more placement policies or skip to defining execution order.
+ */
+ PlacementPolicySetter add(Hosts hosts, Racks racks, String runnableName, String... runnableNames);
+
+ /**
+ * Specify a placement policy for a list of runnables.
+ * @param type {@link PlacementPolicy.Type} specifying a specific placement policy type.
+ * @param runnableName a runnable name.
+ * @param runnableNames a list of runnable names.
+ * @return A reference to either add more placement policies or skip to defining execution order.
+ */
+ PlacementPolicySetter add(PlacementPolicy.Type type, String runnableName, String... runnableNames);
+ }
+
+ /**
+ * Interface to define execution order after adding placement policies.
+ */
+ public interface AfterPlacementPolicy {
+ /**
+ * Start defining execution order.
+ */
+ FirstOrder withOrder();
+
+ /**
+ * No particular execution order is needed.
+ */
+ AfterOrder anyOrder();
+ }
+
+ public final class PlacementPolicySetter implements MorePlacementPolicies, AfterPlacementPolicy {
+
+ @Override
+ public PlacementPolicySetter add(Hosts hosts, String runnableName, String... runnableNames) {
+ return addPlacementPolicy(PlacementPolicy.Type.DEFAULT, hosts, null, runnableName, runnableNames);
+ }
+
+ @Override
+ public PlacementPolicySetter add(Racks racks, String runnableName, String... runnableNames) {
+ return addPlacementPolicy(PlacementPolicy.Type.DEFAULT, null, racks, runnableName, runnableNames);
+ }
+
+ @Override
+ public PlacementPolicySetter add(Hosts hosts, Racks racks, String runnableName, String... runnableNames) {
+ return addPlacementPolicy(PlacementPolicy.Type.DEFAULT, hosts, racks, runnableName, runnableNames);
+ }
+
+ @Override
+ public PlacementPolicySetter add(PlacementPolicy.Type type, String runnableName, String...runnableNames) {
+ return addPlacementPolicy(type, null, null, runnableName, runnableNames);
+ }
+
+ private PlacementPolicySetter addPlacementPolicy(PlacementPolicy.Type type, Hosts hosts, Racks racks,
+ String runnableName, String...runnableNames) {
+ Preconditions.checkArgument(runnableName != null, "Name cannot be null.");
+ Preconditions.checkArgument(runnables.containsKey(runnableName), "Runnable not exists.");
+ Preconditions.checkArgument(!contains(runnableName),
+ "Runnable (" + runnableName + ") cannot belong to more than one Placement Policy");
+ Set<String> runnableNamesSet = Sets.newHashSet(runnableName);
+ for (String name : runnableNames) {
+ Preconditions.checkArgument(name != null, "Name cannot be null.");
+ Preconditions.checkArgument(runnables.containsKey(name), "Runnable not exists.");
+ Preconditions.checkArgument(!contains(name),
+ "Runnable (" + name + ") cannot belong to more than one Placement Policy");
+ runnableNamesSet.add(name);
+ }
+ placementPolicies.add(
+ new DefaultTwillSpecification.DefaultPlacementPolicy(runnableNamesSet, type, hosts, racks));
+ return this;
+ }
+
+ private boolean contains(String runnableName) {
+ for (PlacementPolicy placementPolicy : placementPolicies) {
+ if (placementPolicy.getNames().contains(runnableName)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public FirstOrder withOrder() {
+ return new OrderSetter();
+ }
+
+ @Override
+ public AfterOrder anyOrder() {
+ return new OrderSetter();
+ }
+ }
+
public interface FirstOrder {
NextOrder begin(String name, String...names);
}
@@ -303,8 +475,7 @@ public interface TwillSpecification {
// For all unordered runnables, add it to the end of orders list
orders.add(new DefaultTwillSpecification.DefaultOrder(runnableNames, Order.Type.STARTED));
-
- return new DefaultTwillSpecification(name, runnables, orders, eventHandler);
+ return new DefaultTwillSpecification(name, runnables, orders, placementPolicies, eventHandler);
}
private void addOrder(final Order.Type type, String name, String...names) {
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-api/src/main/java/org/apache/twill/internal/DefaultResourceSpecification.java
----------------------------------------------------------------------
diff --git a/twill-api/src/main/java/org/apache/twill/internal/DefaultResourceSpecification.java b/twill-api/src/main/java/org/apache/twill/internal/DefaultResourceSpecification.java
index 2998165..1327ce5 100644
--- a/twill-api/src/main/java/org/apache/twill/internal/DefaultResourceSpecification.java
+++ b/twill-api/src/main/java/org/apache/twill/internal/DefaultResourceSpecification.java
@@ -17,12 +17,8 @@
*/
package org.apache.twill.internal;
-import com.google.common.collect.ImmutableList;
import org.apache.twill.api.ResourceSpecification;
-import java.util.Collections;
-import java.util.List;
-
/**
* Straightforward implementation of {@link org.apache.twill.api.ResourceSpecification}.
*/
@@ -32,24 +28,13 @@ public final class DefaultResourceSpecification implements ResourceSpecification
private final int instances;
private final int uplink;
private final int downlink;
- private final List<String> hosts;
- private final List<String> racks;
public DefaultResourceSpecification(int virtualCores, int memorySize, int instances, int uplink, int downlink) {
- this(virtualCores, memorySize, instances, uplink, downlink,
- Collections.<String>emptyList(), Collections.<String>emptyList());
- }
-
- public DefaultResourceSpecification(int virtualCores, int memorySize, int instances,
- int uplink, int downlink,
- List<String> hosts, List<String> racks) {
this.virtualCores = virtualCores;
this.memorySize = memorySize;
this.instances = instances;
this.uplink = uplink;
this.downlink = downlink;
- this.hosts = ImmutableList.copyOf(hosts);
- this.racks = ImmutableList.copyOf(racks);
}
@Deprecated
@@ -74,16 +59,6 @@ public final class DefaultResourceSpecification implements ResourceSpecification
}
@Override
- public List<String> getHosts() {
- return this.hosts;
- }
-
- @Override
- public List<String> getRacks() {
- return this.racks;
- }
-
- @Override
public int getUplink() {
return uplink;
}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-api/src/main/java/org/apache/twill/internal/DefaultTwillSpecification.java
----------------------------------------------------------------------
diff --git a/twill-api/src/main/java/org/apache/twill/internal/DefaultTwillSpecification.java b/twill-api/src/main/java/org/apache/twill/internal/DefaultTwillSpecification.java
index fdb8b32..184afb3 100644
--- a/twill-api/src/main/java/org/apache/twill/internal/DefaultTwillSpecification.java
+++ b/twill-api/src/main/java/org/apache/twill/internal/DefaultTwillSpecification.java
@@ -22,9 +22,12 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.twill.api.EventHandlerSpecification;
+import org.apache.twill.api.Hosts;
+import org.apache.twill.api.Racks;
import org.apache.twill.api.RuntimeSpecification;
import org.apache.twill.api.TwillSpecification;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -38,13 +41,16 @@ public final class DefaultTwillSpecification implements TwillSpecification {
private final String name;
private final Map<String, RuntimeSpecification> runnables;
private final List<Order> orders;
+ private final List<PlacementPolicy> placementPolicies;
private final EventHandlerSpecification eventHandler;
public DefaultTwillSpecification(String name, Map<String, RuntimeSpecification> runnables,
- List<Order> orders, EventHandlerSpecification eventHandler) {
+ List<Order> orders, List<PlacementPolicy> placementPolicies,
+ EventHandlerSpecification eventHandler) {
this.name = name;
this.runnables = ImmutableMap.copyOf(runnables);
this.orders = ImmutableList.copyOf(orders);
+ this.placementPolicies = placementPolicies;
this.eventHandler = eventHandler;
}
@@ -63,6 +69,11 @@ public final class DefaultTwillSpecification implements TwillSpecification {
return orders;
}
+ @Override
+ public List<PlacementPolicy> getPlacementPolicies() {
+ return placementPolicies;
+ }
+
@Nullable
@Override
public EventHandlerSpecification getEventHandler() {
@@ -100,4 +111,77 @@ public final class DefaultTwillSpecification implements TwillSpecification {
.toString();
}
}
+
+ /**
+ * Straightforward implementation of {@link org.apache.twill.api.TwillSpecification.PlacementPolicy}.
+ */
+ public static final class DefaultPlacementPolicy implements PlacementPolicy {
+
+ private final Set<String> names;
+ private final Type type;
+ private final Hosts hosts;
+ private final Racks racks;
+
+ public DefaultPlacementPolicy(Iterable<String> names, Type type, Hosts hosts, Racks racks) {
+ this.names = ImmutableSet.copyOf(names);
+ this.type = type;
+ this.hosts = hosts;
+ this.racks = racks;
+ }
+
+ public DefaultPlacementPolicy(Iterable<String> names, Type type) {
+ this(names, type, null, null);
+ }
+
+ /**
+ * @return Set of {@link org.apache.twill.api.TwillRunnable} names that belongs to this placement policy.
+ */
+ @Override
+ public Set<String> getNames() {
+ return names;
+ }
+
+ /**
+ * @return {@link org.apache.twill.api.TwillSpecification.PlacementPolicy.Type Type} of this placement policy.
+ */
+ @Override
+ public Type getType() {
+ return type;
+ }
+
+ /**
+ * @return Set of hosts associated with this placement policy.
+ */
+ @Override
+ public Set<String> getHosts() {
+ if (this.hosts == null) {
+ return Collections.emptySet();
+ }
+ return this.hosts.get();
+ }
+
+ /**
+ * @return Set of racks associated with this placement policy.
+ */
+ @Override
+ public Set<String> getRacks() {
+ if (this.racks == null) {
+ return Collections.emptySet();
+ }
+ return this.racks.get();
+ }
+
+ /**
+ * @return String representation of Placement Policy
+ */
+ @Override
+ public String toString() {
+ return Objects.toStringHelper(this)
+ .add("names", names)
+ .add("type", type)
+ .add("hosts", hosts)
+ .add("racks", racks)
+ .toString();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-core/src/main/java/org/apache/twill/internal/Constants.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/Constants.java b/twill-core/src/main/java/org/apache/twill/internal/Constants.java
index 9912638..fbc6e70 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/Constants.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/Constants.java
@@ -31,6 +31,12 @@ public final class Constants {
public static final long PROVISION_TIMEOUT = 30000;
+ /**
+ * Milliseconds AM should wait for RM to allocate a constrained provision request.
+ * On timeout, AM relaxes the request constraints.
+ */
+ public static final int CONSTRAINED_PROVISION_REQUEST_TIMEOUT = 5000;
+
/** Memory size of AM. */
public static final int APP_MASTER_MEMORY_MB = 512;
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-core/src/main/java/org/apache/twill/internal/json/ResourceSpecificationCodec.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/json/ResourceSpecificationCodec.java b/twill-core/src/main/java/org/apache/twill/internal/json/ResourceSpecificationCodec.java
index 5f7d7ae..412e406 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/json/ResourceSpecificationCodec.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/json/ResourceSpecificationCodec.java
@@ -44,8 +44,6 @@ final class ResourceSpecificationCodec implements JsonSerializer<ResourceSpecifi
json.addProperty("instances", src.getInstances());
json.addProperty("uplink", src.getUplink());
json.addProperty("downlink", src.getDownlink());
- json.add("hosts", context.serialize(src.getHosts()));
- json.add("racks", context.serialize(src.getRacks()));
return json;
}
@@ -59,9 +57,7 @@ final class ResourceSpecificationCodec implements JsonSerializer<ResourceSpecifi
jsonObj.get("memorySize").getAsInt(),
jsonObj.get("instances").getAsInt(),
jsonObj.get("uplink").getAsInt(),
- jsonObj.get("downlink").getAsInt(),
- hosts,
- racks);
+ jsonObj.get("downlink").getAsInt());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-core/src/main/java/org/apache/twill/internal/json/TwillSpecificationAdapter.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/json/TwillSpecificationAdapter.java b/twill-core/src/main/java/org/apache/twill/internal/json/TwillSpecificationAdapter.java
index 67c15a2..d882096 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/json/TwillSpecificationAdapter.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/json/TwillSpecificationAdapter.java
@@ -36,6 +36,7 @@ import org.apache.twill.api.TwillRunnableSpecification;
import org.apache.twill.api.TwillSpecification;
import org.apache.twill.internal.json.TwillSpecificationCodec.EventHandlerSpecificationCoder;
import org.apache.twill.internal.json.TwillSpecificationCodec.TwillSpecificationOrderCoder;
+import org.apache.twill.internal.json.TwillSpecificationCodec.TwillSpecificationPlacementPolicyCoder;
import java.io.File;
import java.io.IOException;
@@ -61,6 +62,8 @@ public final class TwillSpecificationAdapter {
.serializeNulls()
.registerTypeAdapter(TwillSpecification.class, new TwillSpecificationCodec())
.registerTypeAdapter(TwillSpecification.Order.class, new TwillSpecificationOrderCoder())
+ .registerTypeAdapter(TwillSpecification.PlacementPolicy.class,
+ new TwillSpecificationPlacementPolicyCoder())
.registerTypeAdapter(EventHandlerSpecification.class, new EventHandlerSpecificationCoder())
.registerTypeAdapter(RuntimeSpecification.class, new RuntimeSpecificationCodec())
.registerTypeAdapter(TwillRunnableSpecification.class, new TwillRunnableSpecificationCodec())
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-core/src/main/java/org/apache/twill/internal/json/TwillSpecificationCodec.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/json/TwillSpecificationCodec.java b/twill-core/src/main/java/org/apache/twill/internal/json/TwillSpecificationCodec.java
index d46434a..96df950 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/json/TwillSpecificationCodec.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/json/TwillSpecificationCodec.java
@@ -26,6 +26,8 @@ import com.google.gson.JsonParseException;
import com.google.gson.JsonSerializationContext;
import com.google.gson.JsonSerializer;
import org.apache.twill.api.EventHandlerSpecification;
+import org.apache.twill.api.Hosts;
+import org.apache.twill.api.Racks;
import org.apache.twill.api.RuntimeSpecification;
import org.apache.twill.api.TwillSpecification;
import org.apache.twill.internal.DefaultEventHandlerSpecification;
@@ -50,6 +52,8 @@ final class TwillSpecificationCodec implements JsonSerializer<TwillSpecification
new TypeToken<Map<String, RuntimeSpecification>>() { }.getType()));
json.add("orders", context.serialize(src.getOrders(),
new TypeToken<List<TwillSpecification.Order>>() { }.getType()));
+ json.add("placementPolicies", context.serialize(
+ src.getPlacementPolicies(), new TypeToken<List<TwillSpecification.PlacementPolicy>>() { }.getType()));
EventHandlerSpecification eventHandler = src.getEventHandler();
if (eventHandler != null) {
json.add("handler", context.serialize(eventHandler, EventHandlerSpecification.class));
@@ -68,6 +72,8 @@ final class TwillSpecificationCodec implements JsonSerializer<TwillSpecification
jsonObj.get("runnables"), new TypeToken<Map<String, RuntimeSpecification>>() { }.getType());
List<TwillSpecification.Order> orders = context.deserialize(
jsonObj.get("orders"), new TypeToken<List<TwillSpecification.Order>>() { }.getType());
+ List<TwillSpecification.PlacementPolicy> placementPolicies = context.deserialize(
+ jsonObj.get("placementPolicies"), new TypeToken<List<TwillSpecification.PlacementPolicy>>() { }.getType());
JsonElement handler = jsonObj.get("handler");
EventHandlerSpecification eventHandler = null;
@@ -75,7 +81,7 @@ final class TwillSpecificationCodec implements JsonSerializer<TwillSpecification
eventHandler = context.deserialize(handler, EventHandlerSpecification.class);
}
- return new DefaultTwillSpecification(name, runnables, orders, eventHandler);
+ return new DefaultTwillSpecification(name, runnables, orders, placementPolicies, eventHandler);
}
static final class TwillSpecificationOrderCoder implements JsonSerializer<TwillSpecification.Order>,
@@ -101,6 +107,34 @@ final class TwillSpecificationCodec implements JsonSerializer<TwillSpecification
}
}
+ static final class TwillSpecificationPlacementPolicyCoder implements
+ JsonSerializer<TwillSpecification.PlacementPolicy>, JsonDeserializer<TwillSpecification.PlacementPolicy> {
+
+ @Override
+ public JsonElement serialize(TwillSpecification.PlacementPolicy src, Type typeOfSrc,
+ JsonSerializationContext context) {
+ JsonObject json = new JsonObject();
+ json.add("names", context.serialize(src.getNames(), new TypeToken<Set<String>>() { }.getType()));
+ json.addProperty("type", src.getType().name());
+ json.add("hosts", context.serialize(src.getHosts(), new TypeToken<Set<String>>() { }.getType()));
+ json.add("racks", context.serialize(src.getRacks(), new TypeToken<Set<String>>() { }.getType()));
+ return json;
+ }
+
+ @Override
+ public TwillSpecification.PlacementPolicy deserialize(JsonElement json, Type typeOfT,
+ JsonDeserializationContext context)
+ throws JsonParseException {
+ JsonObject jsonObj = json.getAsJsonObject();
+ Set<String> names = context.deserialize(jsonObj.get("names"), new TypeToken<Set<String>>() { }.getType());
+ TwillSpecification.PlacementPolicy.Type type =
+ TwillSpecification.PlacementPolicy.Type.valueOf(jsonObj.get("type").getAsString());
+ Set<String> hosts = context.deserialize(jsonObj.get("hosts"), new TypeToken<Set<String>>() { }.getType());
+ Set<String> racks = context.deserialize(jsonObj.get("racks"), new TypeToken<Set<String>>() { }.getType());
+ return new DefaultTwillSpecification.DefaultPlacementPolicy(names, type, new Hosts(hosts), new Racks(racks));
+ }
+ }
+
static final class EventHandlerSpecificationCoder implements JsonSerializer<EventHandlerSpecification>,
JsonDeserializer<EventHandlerSpecification> {
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-core/src/test/java/org/apache/twill/internal/json/ResourceSpecificationCodecTest.java
----------------------------------------------------------------------
diff --git a/twill-core/src/test/java/org/apache/twill/internal/json/ResourceSpecificationCodecTest.java b/twill-core/src/test/java/org/apache/twill/internal/json/ResourceSpecificationCodecTest.java
index e8b0eff..2e949d4 100644
--- a/twill-core/src/test/java/org/apache/twill/internal/json/ResourceSpecificationCodecTest.java
+++ b/twill-core/src/test/java/org/apache/twill/internal/json/ResourceSpecificationCodecTest.java
@@ -26,8 +26,6 @@ import org.junit.Assert;
import org.junit.Test;
import org.unitils.reflectionassert.ReflectionAssert;
-import java.util.Arrays;
-
/**
* Maybe this checkstyle rule needs to be removed.
*/
@@ -45,13 +43,10 @@ public class ResourceSpecificationCodecTest {
"\"memorySize\":1024," +
"\"instances\":2," +
"\"uplink\":100," +
- "\"downlink\":100," +
- "\"hosts\":[\"one1\",\"two2\"]," +
- "\"racks\":[\"three3\"]" +
+ "\"downlink\":100" +
"}";
final ResourceSpecification expected =
- new DefaultResourceSpecification(2, 1024, 2, 100, 100,
- Arrays.asList("one1", "two2"), Arrays.asList("three3"));
+ new DefaultResourceSpecification(2, 1024, 2, 100, 100);
final String actualString = gson.toJson(expected);
Assert.assertEquals(expectedString, actualString);
@@ -71,12 +66,9 @@ public class ResourceSpecificationCodecTest {
.setInstances(3)
.setUplink(10, ResourceSpecification.SizeUnit.GIGA)
.setDownlink(5, ResourceSpecification.SizeUnit.GIGA)
- .setHosts("a1", "b2", "c3")
- .setRacks("r2")
.build();
final DefaultResourceSpecification expected =
- new DefaultResourceSpecification(5, 4096, 3, 10240, 5120,
- Arrays.asList("a1", "b2", "c3"), Arrays.asList("r2"));
+ new DefaultResourceSpecification(5, 4096, 3, 10240, 5120);
ReflectionAssert.assertLenientEquals(expected, actual);
}
@@ -88,12 +80,9 @@ public class ResourceSpecificationCodecTest {
.setInstances(3)
.setUplink(10, ResourceSpecification.SizeUnit.GIGA)
.setDownlink(5, ResourceSpecification.SizeUnit.GIGA)
- .setHosts(Arrays.asList("a1", "b2", "c3"))
- .setRacks(Arrays.asList("r2"))
.build();
final DefaultResourceSpecification expected =
- new DefaultResourceSpecification(5, 4096, 3, 10240, 5120,
- Arrays.asList("a1", "b2", "c3"), Arrays.asList("r2"));
+ new DefaultResourceSpecification(5, 4096, 3, 10240, 5120);
ReflectionAssert.assertLenientEquals(expected, actual);
}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAMClient.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAMClient.java b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAMClient.java
index 9b66f67..a990cc0 100644
--- a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAMClient.java
+++ b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAMClient.java
@@ -100,6 +100,11 @@ public final class Hadoop20YarnAMClient extends AbstractYarnAMClient<AMRMClient.
}
@Override
+ public int getNMPort() {
+ return Integer.parseInt(System.getenv().get(ApplicationConstants.NM_PORT_ENV));
+ }
+
+ @Override
protected Resource adjustCapability(Resource resource) {
int cores = YarnUtils.getVirtualCores(resource);
int updatedCores = Math.max(Math.min(cores, YarnUtils.getVirtualCores(maxCapability)),
@@ -123,7 +128,9 @@ public final class Hadoop20YarnAMClient extends AbstractYarnAMClient<AMRMClient.
@Override
protected AMRMClient.ContainerRequest createContainerRequest(Priority priority, Resource capability,
- @Nullable String[] hosts, @Nullable String[] racks) {
+ @Nullable String[] hosts, @Nullable String[] racks,
+ boolean relaxLocality) {
+ // Ignore relaxLocality param since the corresponding support is not present in Hadoop 2.0.
return new AMRMClient.ContainerRequest(capability, hosts, racks, priority, 1);
}
@@ -138,6 +145,14 @@ public final class Hadoop20YarnAMClient extends AbstractYarnAMClient<AMRMClient.
}
@Override
+ protected void updateBlacklist(List<String> blacklistAdditions, List<String> blacklistRemovals) {
+ // An empty implementation since Blacklist is not supported in Hadoop-2.0.
+ if (recordUnsupportedFeature("blacklist")) {
+ LOG.warn("Blacklist is not supported in Hadoop 2.0");
+ }
+ }
+
+ @Override
protected AllocateResult doAllocate(float progress) throws Exception {
AllocationResponse response = amrmClient.allocate(progress);
List<RunnableProcessLauncher> launchers
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAppClient.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAppClient.java b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAppClient.java
index 6fdd99e..3f7422d 100644
--- a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAppClient.java
+++ b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAppClient.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.DelegationToken;
+import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.YarnClient;
import org.apache.hadoop.yarn.client.YarnClientImpl;
@@ -45,6 +46,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
+import java.util.List;
/**
*
@@ -156,6 +158,11 @@ public final class Hadoop20YarnAppClient extends AbstractIdleService implements
}
@Override
+ public List<NodeReport> getNodeReports() throws Exception {
+ return this.yarnClient.getNodeReports();
+ }
+
+ @Override
protected void startUp() throws Exception {
yarnClient.start();
}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAMClient.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAMClient.java b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAMClient.java
index 78a5135..c5ac458 100644
--- a/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAMClient.java
+++ b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAMClient.java
@@ -42,11 +42,11 @@ import javax.annotation.Nullable;
/**
*
*/
-public final class Hadoop21YarnAMClient extends AbstractYarnAMClient<AMRMClient.ContainerRequest> {
+public class Hadoop21YarnAMClient extends AbstractYarnAMClient<AMRMClient.ContainerRequest> {
private static final Logger LOG = LoggerFactory.getLogger(Hadoop21YarnAMClient.class);
- private static final Function<ContainerStatus, YarnContainerStatus> STATUS_TRANSFORM;
+ protected static final Function<ContainerStatus, YarnContainerStatus> STATUS_TRANSFORM;
static {
STATUS_TRANSFORM = new Function<ContainerStatus, YarnContainerStatus>() {
@@ -57,9 +57,9 @@ public final class Hadoop21YarnAMClient extends AbstractYarnAMClient<AMRMClient.
};
}
- private final AMRMClient<AMRMClient.ContainerRequest> amrmClient;
- private final Hadoop21YarnNMClient nmClient;
- private Resource maxCapability;
+ protected final AMRMClient<AMRMClient.ContainerRequest> amrmClient;
+ protected final Hadoop21YarnNMClient nmClient;
+ protected Resource maxCapability;
public Hadoop21YarnAMClient(Configuration conf) {
super(ApplicationConstants.Environment.CONTAINER_ID.name());
@@ -95,9 +95,15 @@ public final class Hadoop21YarnAMClient extends AbstractYarnAMClient<AMRMClient.
}
@Override
+ public int getNMPort() {
+ return Integer.parseInt(System.getenv().get(ApplicationConstants.Environment.NM_PORT.name()));
+ }
+
+ @Override
protected AMRMClient.ContainerRequest createContainerRequest(Priority priority, Resource capability,
- @Nullable String[] hosts, @Nullable String[] racks) {
- return new AMRMClient.ContainerRequest(capability, hosts, racks, priority);
+ @Nullable String[] hosts, @Nullable String[] racks,
+ boolean relaxLocality) {
+ return new AMRMClient.ContainerRequest(capability, hosts, racks, priority, relaxLocality);
}
@Override
@@ -111,6 +117,14 @@ public final class Hadoop21YarnAMClient extends AbstractYarnAMClient<AMRMClient.
}
@Override
+ protected void updateBlacklist(List<String> blacklistAdditions, List<String> blacklistRemovals) {
+ // An empty implementation since Blacklist is not supported in Hadoop-2.1 AMRMClient.
+ if (recordUnsupportedFeature("blacklist")) {
+ LOG.warn("Blacklist is not supported in Hadoop 2.1 AMRMClient");
+ }
+ }
+
+ @Override
protected AllocateResult doAllocate(float progress) throws Exception {
AllocateResponse allocateResponse = amrmClient.allocate(progress);
List<RunnableProcessLauncher> launchers
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java
index 20558bb..9b70a86 100644
--- a/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java
+++ b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.YarnClientApplication;
@@ -41,6 +42,8 @@ import org.apache.twill.internal.appmaster.ApplicationSubmitter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.List;
+
/**
*
*/
@@ -136,6 +139,11 @@ public final class Hadoop21YarnAppClient extends AbstractIdleService implements
}
@Override
+ public List<NodeReport> getNodeReports() throws Exception {
+ return this.yarnClient.getNodeReports();
+ }
+
+ @Override
protected void startUp() throws Exception {
yarnClient.start();
}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-yarn/src/main/hadoop22/org/apache/twill/internal/yarn/Hadoop22YarnAMClient.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop22/org/apache/twill/internal/yarn/Hadoop22YarnAMClient.java b/twill-yarn/src/main/hadoop22/org/apache/twill/internal/yarn/Hadoop22YarnAMClient.java
new file mode 100644
index 0000000..3ee99e8
--- /dev/null
+++ b/twill-yarn/src/main/hadoop22/org/apache/twill/internal/yarn/Hadoop22YarnAMClient.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * Wrapper class for AMRMClient for Hadoop version 2.2 or greater.
+ */
+public final class Hadoop22YarnAMClient extends Hadoop21YarnAMClient {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Hadoop22YarnAMClient.class);
+
+ public Hadoop22YarnAMClient(Configuration conf) {
+ super(conf);
+ }
+
+ @Override
+ protected void updateBlacklist(List<String> blacklistAdditions, List<String> blacklistRemovals) {
+ LOG.debug("Blacklist Additions: {} , Blacklist Removals: {}", blacklistAdditions, blacklistRemovals);
+ amrmClient.updateBlacklist(blacklistAdditions, blacklistRemovals);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/AllocationSpecification.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/AllocationSpecification.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/AllocationSpecification.java
new file mode 100644
index 0000000..38cb32c
--- /dev/null
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/AllocationSpecification.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.appmaster;
+
+import com.google.common.base.Objects;
+import org.apache.hadoop.yarn.api.records.Resource;
+
+/**
+ * This class defines how the containers should be allocated.
+ */
+public class AllocationSpecification {
+
+ /**
+ * Defines different types of allocation strategies.
+ */
+ enum Type {
+ ALLOCATE_ONE_INSTANCE_AT_A_TIME,
+ DEFAULT
+ }
+
+ /**
+ * Resource specification of runnables.
+ */
+ private Resource resource;
+
+ /**
+ * Allocation strategy Type.
+ */
+ private Type type;
+
+ /**
+ * Name of runnable. Set to null if the class represents more than one runnable.
+ */
+ private String runnableName;
+
+ /**
+ * Instance number for the runnable. Set to 0 if the class represents more than one instance / runnable.
+ */
+ private int instanceId;
+
+ public AllocationSpecification(Resource resource) {
+ this(resource, Type.DEFAULT, null, 0);
+ }
+
+ public AllocationSpecification(Resource resource, Type type, String runnableName, int instanceId) {
+ this.resource = resource;
+ this.type = type;
+ this.runnableName = runnableName;
+ this.instanceId = instanceId;
+ }
+
+ public Resource getResource() {
+ return this.resource;
+ }
+
+ public Type getType() {
+ return this.type;
+ }
+
+ public String getRunnableName() {
+ return this.runnableName;
+ }
+
+ public int getInstanceId() {
+ return this.instanceId;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (!(obj instanceof AllocationSpecification)) {
+ return false;
+ }
+ AllocationSpecification other = (AllocationSpecification) obj;
+ return (instanceId == other.instanceId) &&
+ Objects.equal(resource, other.resource) &&
+ Objects.equal(type, other.type) &&
+ Objects.equal(runnableName, other.runnableName);
+ }
+
+ @Override
+ public int hashCode() {
+ return this.resource.hashCode();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
index c25a002..b4f9997 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
@@ -25,7 +25,6 @@ import com.google.common.base.Throwables;
import com.google.common.collect.HashMultiset;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -64,6 +63,7 @@ import org.apache.twill.filesystem.Location;
import org.apache.twill.internal.AbstractTwillService;
import org.apache.twill.internal.Configs;
import org.apache.twill.internal.Constants;
+import org.apache.twill.internal.ContainerInfo;
import org.apache.twill.internal.DefaultTwillRunResources;
import org.apache.twill.internal.EnvKeys;
import org.apache.twill.internal.JvmOptions;
@@ -129,6 +129,7 @@ public final class ApplicationMasterService extends AbstractTwillService {
private final int reservedMemory;
private final EventHandler eventHandler;
private final Location applicationLocation;
+ private final PlacementPolicyManager placementPolicyManager;
private EmbeddedKafkaServer kafkaServer;
private Queue<RunnableContainerRequest> runnableContainerRequests;
@@ -146,6 +147,7 @@ public final class ApplicationMasterService extends AbstractTwillService {
this.credentials = createCredentials();
this.jvmOpts = loadJvmOptions();
this.reservedMemory = getReservedMemory();
+ this.placementPolicyManager = new PlacementPolicyManager(twillSpec.getPlacementPolicies());
amLiveNode = new ApplicationMasterLiveNodeData(Integer.parseInt(System.getenv(EnvKeys.YARN_APP_ID)),
Long.parseLong(System.getenv(EnvKeys.YARN_APP_ID_CLUSTER_TIME)),
@@ -348,7 +350,7 @@ public final class ApplicationMasterService extends AbstractTwillService {
private void doRun() throws Exception {
// The main loop
- Map.Entry<Resource, ? extends Collection<RuntimeSpecification>> currentRequest = null;
+ Map.Entry<AllocationSpecification, ? extends Collection<RuntimeSpecification>> currentRequest = null;
final Queue<ProvisionRequest> provisioning = Lists.newLinkedList();
YarnAMClient.AllocateHandler allocateHandler = new YarnAMClient.AllocateHandler() {
@@ -363,6 +365,8 @@ public final class ApplicationMasterService extends AbstractTwillService {
}
};
+ long requestStartTime = 0;
+ boolean isRequestRelaxed = false;
long nextTimeoutCheck = System.currentTimeMillis() + Constants.PROVISION_TIMEOUT;
while (isRunning()) {
// Call allocate. It has to be made at first in order to be able to get cluster resource availability.
@@ -383,10 +387,25 @@ public final class ApplicationMasterService extends AbstractTwillService {
runnableContainerRequests.poll();
}
}
+
// Nothing in provision, makes the next batch of provision request
if (provisioning.isEmpty() && currentRequest != null) {
- addContainerRequests(currentRequest.getKey(), currentRequest.getValue(), provisioning);
+ manageBlacklist(currentRequest);
+ addContainerRequests(currentRequest.getKey().getResource(), currentRequest.getValue(), provisioning,
+ currentRequest.getKey().getType());
currentRequest = null;
+ requestStartTime = System.currentTimeMillis();
+ isRequestRelaxed = false;
+ }
+
+ // Check for provision request timeout i.e. check if any provision request has been pending
+ // for more than the designated time. On timeout, relax the request constraints.
+ if (!provisioning.isEmpty() && !isRequestRelaxed &&
+ (System.currentTimeMillis() - requestStartTime) > Constants.CONSTRAINED_PROVISION_REQUEST_TIMEOUT) {
+ LOG.info("Relaxing provisioning constraints for request {}", provisioning.peek().getRequestId());
+ // Clear the blacklist for the pending provision request(s).
+ clearBlacklist();
+ isRequestRelaxed = true;
}
nextTimeoutCheck = checkProvisionTimeout(nextTimeoutCheck);
@@ -398,6 +417,37 @@ public final class ApplicationMasterService extends AbstractTwillService {
}
/**
+ * Manage Blacklist for a given request.
+ */
+ private void manageBlacklist(Map.Entry<AllocationSpecification, ? extends Collection<RuntimeSpecification>> request) {
+ clearBlacklist();
+
+ //Check the allocation strategy
+ AllocationSpecification currentAllocationSpecification = request.getKey();
+ if (currentAllocationSpecification.getType().equals(AllocationSpecification.Type.ALLOCATE_ONE_INSTANCE_AT_A_TIME)) {
+
+ //Check the placement policy
+ TwillSpecification.PlacementPolicy placementPolicy =
+ placementPolicyManager.getPlacementPolicy(currentAllocationSpecification.getRunnableName());
+ if (placementPolicy.getType().equals(TwillSpecification.PlacementPolicy.Type.DISTRIBUTED)) {
+
+ //Update blacklist with hosts which are running DISTRIBUTED runnables
+ for (String runnable : placementPolicyManager.getFellowRunnables(request.getKey().getRunnableName())) {
+ Collection<ContainerInfo> containerStats =
+ runningContainers.getContainerInfo(runnable);
+ for (ContainerInfo containerInfo : containerStats) {
+ // Yarn Resource Manager may include port in the node name depending on the setting
+ // YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME. It is safe to add both
+ // the names (with and without port) to the blacklist.
+ addToBlacklist(containerInfo.getHost().getHostName());
+ addToBlacklist(containerInfo.getHost().getHostName() + ":" + containerInfo.getPort());
+ }
+ }
+ }
+ }
+ }
+
+ /**
* Handling containers that are completed.
*/
private void handleCompleted(List<YarnContainerStatus> completedContainersStatuses) {
@@ -433,13 +483,16 @@ public final class ApplicationMasterService extends AbstractTwillService {
// Invoke event handler for provision request timeout
Map<String, ExpectedContainers.ExpectedCount> expiredRequests = expectedContainers.getAll();
Map<String, Integer> runningCounts = runningContainers.countAll();
+ Map<String, Integer> completedContainerCount = runningContainers.getCompletedContainerCount();
List<EventHandler.TimeoutEvent> timeoutEvents = Lists.newArrayList();
for (Map.Entry<String, ExpectedContainers.ExpectedCount> entry : expiredRequests.entrySet()) {
String runnableName = entry.getKey();
ExpectedContainers.ExpectedCount expectedCount = entry.getValue();
int runningCount = runningCounts.containsKey(runnableName) ? runningCounts.get(runnableName) : 0;
- if (expectedCount.getCount() != runningCount) {
+ int completedCount = completedContainerCount.containsKey(runnableName) ?
+ completedContainerCount.get(runnableName) : 0;
+ if (expectedCount.getCount() > runningCount + completedCount) {
timeoutEvents.add(new EventHandler.TimeoutEvent(runnableName, expectedCount.getCount(),
runningCount, expectedCount.getTimestamp()));
}
@@ -488,42 +541,101 @@ public final class ApplicationMasterService extends AbstractTwillService {
private Queue<RunnableContainerRequest> initContainerRequests() {
// Orderly stores container requests.
Queue<RunnableContainerRequest> requests = Lists.newLinkedList();
- // For each order in the twillSpec, create container request for each runnable.
+ // For each order in the twillSpec, create container request for runnables, depending on Placement policy.
for (TwillSpecification.Order order : twillSpec.getOrders()) {
- // Group container requests based on resource requirement.
- ImmutableMultimap.Builder<Resource, RuntimeSpecification> builder = ImmutableMultimap.builder();
- for (String runnableName : order.getNames()) {
+ Set<String> distributedRunnables = placementPolicyManager.getDistributedRunnables(order.getNames());
+ Set<String> defaultRunnables = Sets.newHashSet();
+ defaultRunnables.addAll(order.getNames());
+ defaultRunnables.removeAll(distributedRunnables);
+ Map<AllocationSpecification, Collection<RuntimeSpecification>> requestsMap = Maps.newHashMap();
+ for (String runnableName : distributedRunnables) {
RuntimeSpecification runtimeSpec = twillSpec.getRunnables().get(runnableName);
Resource capability = createCapability(runtimeSpec.getResourceSpecification());
- builder.put(capability, runtimeSpec);
+ for (int instanceId = 0; instanceId < runtimeSpec.getResourceSpecification().getInstances(); instanceId++) {
+ AllocationSpecification allocationSpecification =
+ new AllocationSpecification(capability, AllocationSpecification.Type.ALLOCATE_ONE_INSTANCE_AT_A_TIME,
+ runnableName, instanceId);
+ addAllocationSpecification(allocationSpecification, requestsMap, runtimeSpec);
+ }
}
- requests.add(new RunnableContainerRequest(order.getType(), builder.build()));
+ for (String runnableName : defaultRunnables) {
+ RuntimeSpecification runtimeSpec = twillSpec.getRunnables().get(runnableName);
+ Resource capability = createCapability(runtimeSpec.getResourceSpecification());
+ AllocationSpecification allocationSpecification = new AllocationSpecification(capability);
+ addAllocationSpecification(allocationSpecification, requestsMap, runtimeSpec);
+ }
+ requests.add(new RunnableContainerRequest(order.getType(), requestsMap));
}
return requests;
}
/**
+ * Helper method to create {@link org.apache.twill.internal.appmaster.RunnableContainerRequest}.
+ */
+ private void addAllocationSpecification(AllocationSpecification allocationSpecification,
+ Map<AllocationSpecification, Collection<RuntimeSpecification>> map,
+ RuntimeSpecification runtimeSpec) {
+ if (!map.containsKey(allocationSpecification)) {
+ map.put(allocationSpecification, Lists.<RuntimeSpecification>newLinkedList());
+ }
+ map.get(allocationSpecification).add(runtimeSpec);
+ }
+
+ /**
* Adds container requests with the given resource capability for each runtime.
*/
private void addContainerRequests(Resource capability,
Collection<RuntimeSpecification> runtimeSpecs,
- Queue<ProvisionRequest> provisioning) {
+ Queue<ProvisionRequest> provisioning,
+ AllocationSpecification.Type allocationType) {
for (RuntimeSpecification runtimeSpec : runtimeSpecs) {
String name = runtimeSpec.getName();
int newContainers = expectedContainers.getExpected(name) - runningContainers.count(name);
if (newContainers > 0) {
+ if (allocationType.equals(AllocationSpecification.Type.ALLOCATE_ONE_INSTANCE_AT_A_TIME)) {
+ //Spawning 1 instance at a time
+ newContainers = 1;
+ }
+ TwillSpecification.PlacementPolicy placementPolicy = placementPolicyManager.getPlacementPolicy(name);
+ Set<String> hosts = Sets.newHashSet();
+ Set<String> racks = Sets.newHashSet();
+ if (placementPolicy != null) {
+ hosts = placementPolicy.getHosts();
+ racks = placementPolicy.getRacks();
+ }
// TODO: Allow user to set priority?
- LOG.info("Request {} container with capability {}", newContainers, capability);
+ LOG.info("Request {} container with capability {} for runnable {}", newContainers, capability, name);
String requestId = amClient.addContainerRequest(capability, newContainers)
- .addHosts(runtimeSpec.getResourceSpecification().getHosts())
- .addRacks(runtimeSpec.getResourceSpecification().getRacks())
+ .addHosts(hosts)
+ .addRacks(racks)
.setPriority(0).apply();
- provisioning.add(new ProvisionRequest(runtimeSpec, requestId, newContainers));
+ provisioning.add(new ProvisionRequest(runtimeSpec, requestId, newContainers, allocationType));
}
}
}
/**
+ * Add a resource to the blacklist.
+ */
+ private void addToBlacklist(String resource) {
+ amClient.addToBlacklist(resource);
+ }
+
+ /**
+ * Remove a resource from the blacklist.
+ */
+ private void removeFromBlacklist(String resource) {
+ amClient.removeFromBlacklist(resource);
+ }
+
+ /**
+ * Clears the resource blacklist.
+ */
+ private void clearBlacklist() {
+ amClient.clearBlacklist();
+ }
+
+ /**
* Launches runnables in the provisioned containers.
*/
private void launchRunnable(List<? extends ProcessLauncher<YarnContainerInfo>> launchers,
@@ -564,9 +676,12 @@ public final class ApplicationMasterService extends AbstractTwillService {
amClient.completeContainerRequest(provisionRequest.getRequestId());
}
+ if (expectedContainers.getExpected(runnableName) == runningContainers.count(runnableName) ||
+ provisioning.peek().getType().equals(AllocationSpecification.Type.ALLOCATE_ONE_INSTANCE_AT_A_TIME)) {
+ provisioning.poll();
+ }
if (expectedContainers.getExpected(runnableName) == runningContainers.count(runnableName)) {
LOG.info("Runnable " + runnableName + " fully provisioned with " + containerCount + " instances.");
- provisioning.poll();
}
}
}
@@ -708,7 +823,7 @@ public final class ApplicationMasterService extends AbstractTwillService {
}
} else {
// Increase the number of instances
- runnableContainerRequests.add(createRunnableContainerRequest(runnableName));
+ runnableContainerRequests.add(createRunnableContainerRequest(runnableName, newCount - oldCount));
}
} finally {
runningContainers.sendToRunnable(runnableName, message, completion);
@@ -723,6 +838,11 @@ public final class ApplicationMasterService extends AbstractTwillService {
}
private RunnableContainerRequest createRunnableContainerRequest(final String runnableName) {
+ return createRunnableContainerRequest(runnableName, 1);
+ }
+
+ private RunnableContainerRequest createRunnableContainerRequest(final String runnableName,
+ final int numberOfInstances) {
// Find the current order of the given runnable in order to create a RunnableContainerRequest.
TwillSpecification.Order order = Iterables.find(twillSpec.getOrders(), new Predicate<TwillSpecification.Order>() {
@Override
@@ -733,7 +853,20 @@ public final class ApplicationMasterService extends AbstractTwillService {
RuntimeSpecification runtimeSpec = twillSpec.getRunnables().get(runnableName);
Resource capability = createCapability(runtimeSpec.getResourceSpecification());
- return new RunnableContainerRequest(order.getType(), ImmutableMultimap.of(capability, runtimeSpec));
+ Map<AllocationSpecification, Collection<RuntimeSpecification>> requestsMap = Maps.newHashMap();
+ if (placementPolicyManager.getPlacementPolicyType(runnableName).equals(
+ TwillSpecification.PlacementPolicy.Type.DISTRIBUTED)) {
+ for (int instanceId = 0; instanceId < numberOfInstances; instanceId++) {
+ AllocationSpecification allocationSpecification =
+ new AllocationSpecification(capability, AllocationSpecification.Type.ALLOCATE_ONE_INSTANCE_AT_A_TIME,
+ runnableName, instanceId);
+ addAllocationSpecification(allocationSpecification, requestsMap, runtimeSpec);
+ }
+ } else {
+ AllocationSpecification allocationSpecification = new AllocationSpecification(capability);
+ addAllocationSpecification(allocationSpecification, requestsMap, runtimeSpec);
+ }
+ return new RunnableContainerRequest(order.getType(), requestsMap);
}
private Runnable getMessageCompletion(final String messageId, final SettableFuture<String> future) {
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/PlacementPolicyManager.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/PlacementPolicyManager.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/PlacementPolicyManager.java
new file mode 100644
index 0000000..91a58bb
--- /dev/null
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/PlacementPolicyManager.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.twill.internal.appmaster;
+
+import com.google.common.collect.Sets;
+import org.apache.twill.api.TwillSpecification;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * This class provides helper functions for operating on a set of Placement Policies.
+ */
+public class PlacementPolicyManager {
+ List<TwillSpecification.PlacementPolicy> placementPolicies;
+
+ public PlacementPolicyManager(List<TwillSpecification.PlacementPolicy> placementPolicies) {
+ this.placementPolicies = placementPolicies;
+ }
+
+ /**
+ * Given a set of runnables, get all runnables which belong to DISTRIBUTED placement policies.
+ * @param givenRunnables Set of runnables.
+ * @return Subset of runnables, which belong to DISTRIBUTED placement policies.
+ */
+ public Set<String> getDistributedRunnables(Set<String> givenRunnables) {
+ Set<String> distributedRunnables = getAllDistributedRunnables();
+ distributedRunnables.retainAll(givenRunnables);
+ return distributedRunnables;
+ }
+
+ /**
+ * Given a runnable, get the type of placement policy. Returns DEFAULT if no placement policy is specified.
+ * @param runnableName Name of runnable.
+ * @return Placement policy type of the runnable.
+ */
+ public TwillSpecification.PlacementPolicy.Type getPlacementPolicyType(String runnableName) {
+ for (TwillSpecification.PlacementPolicy placementPolicy : placementPolicies) {
+ if (placementPolicy.getNames().contains(runnableName)) {
+ return placementPolicy.getType();
+ }
+ }
+ return TwillSpecification.PlacementPolicy.Type.DEFAULT;
+ }
+
+ /**
+ * Get all runnables which belong to the same Placement policy as the given runnable.
+ * @param runnableName Name of runnable.
+ * @return Set of runnables, with same placement policy.
+ */
+ public Set<String> getFellowRunnables(String runnableName) {
+ for (TwillSpecification.PlacementPolicy placementPolicy : placementPolicies) {
+ if (placementPolicy.getNames().contains(runnableName)) {
+ return placementPolicy.getNames();
+ }
+ }
+ return Collections.emptySet();
+ }
+
+ /**
+ * Get the placement policy of the runnable.
+ * Returns null if runnable does not belong to a placement policy.
+ * @param runnableName Name of runnable.
+ * @return Placement policy of the runnable.
+ */
+ public TwillSpecification.PlacementPolicy getPlacementPolicy(String runnableName) {
+ for (TwillSpecification.PlacementPolicy placementPolicy : placementPolicies) {
+ if (placementPolicy.getNames().contains(runnableName)) {
+ return placementPolicy;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Gets all runnables which belong to DISTRIBUTED placement policies.
+ */
+ private Set<String> getAllDistributedRunnables() {
+ Set<String> distributedRunnables = Sets.newHashSet();
+ for (TwillSpecification.PlacementPolicy placementPolicy : placementPolicies) {
+ if (placementPolicy.getType().equals(TwillSpecification.PlacementPolicy.Type.DISTRIBUTED)) {
+ distributedRunnables.addAll(placementPolicy.getNames());
+ }
+ }
+ return distributedRunnables;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ProvisionRequest.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ProvisionRequest.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ProvisionRequest.java
index 002d2a5..a6eba3e 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ProvisionRequest.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ProvisionRequest.java
@@ -26,11 +26,18 @@ final class ProvisionRequest {
private final RuntimeSpecification runtimeSpec;
private final String requestId;
private int requestCount;
+ private final AllocationSpecification.Type type;
ProvisionRequest(RuntimeSpecification runtimeSpec, String requestId, int requestCount) {
+ this(runtimeSpec, requestId, requestCount, AllocationSpecification.Type.DEFAULT);
+ }
+
+ ProvisionRequest(RuntimeSpecification runtimeSpec, String requestId, int requestCount,
+ AllocationSpecification.Type type) {
this.runtimeSpec = runtimeSpec;
this.requestId = requestId;
this.requestCount = requestCount;
+ this.type = type;
}
RuntimeSpecification getRuntimeSpec() {
@@ -49,4 +56,8 @@ final class ProvisionRequest {
requestCount--;
return requestCount == 0;
}
+
+ AllocationSpecification.Type getType() {
+ return this.type;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableContainerRequest.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableContainerRequest.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableContainerRequest.java
index de199ad..2105629 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableContainerRequest.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableContainerRequest.java
@@ -20,7 +20,6 @@ package org.apache.twill.internal.appmaster;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
import com.google.common.collect.Maps;
-import com.google.common.collect.Multimap;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.twill.api.RuntimeSpecification;
import org.apache.twill.api.TwillSpecification;
@@ -34,12 +33,12 @@ import java.util.Map;
*/
final class RunnableContainerRequest {
private final TwillSpecification.Order.Type orderType;
- private final Iterator<Map.Entry<Resource, Collection<RuntimeSpecification>>> requests;
+ private final Iterator<Map.Entry<AllocationSpecification, Collection<RuntimeSpecification>>> requests;
RunnableContainerRequest(TwillSpecification.Order.Type orderType,
- Multimap<Resource, RuntimeSpecification> requests) {
+ Map<AllocationSpecification, Collection<RuntimeSpecification>> requests) {
this.orderType = orderType;
- this.requests = requests.asMap().entrySet().iterator();
+ this.requests = requests.entrySet().iterator();
}
TwillSpecification.Order.Type getOrderType() {
@@ -51,8 +50,8 @@ final class RunnableContainerRequest {
* @return The {@link Resource} and {@link Collection} of {@link RuntimeSpecification} or
* {@code null} if there is no more request.
*/
- Map.Entry<Resource, ? extends Collection<RuntimeSpecification>> takeRequest() {
- Map.Entry<Resource, Collection<RuntimeSpecification>> next = Iterators.getNext(requests, null);
+ Map.Entry<AllocationSpecification, ? extends Collection<RuntimeSpecification>> takeRequest() {
+ Map.Entry<AllocationSpecification, Collection<RuntimeSpecification>> next = Iterators.getNext(requests, null);
return next == null ? null : Maps.immutableEntry(next.getKey(), ImmutableList.copyOf(next.getValue()));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableProcessLauncher.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableProcessLauncher.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableProcessLauncher.java
index 2126541..0e3aea5 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableProcessLauncher.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableProcessLauncher.java
@@ -68,8 +68,9 @@ public final class RunnableProcessLauncher extends AbstractYarnProcessLauncher<Y
launchContext.setEnvironment(env);
- LOG.info("Launching in container {} at {}, {}",
- containerInfo.getId(), containerInfo.getHost(), launchContext.getCommands());
+ LOG.info("Launching in container {} at {}:{}, {}",
+ containerInfo.getId(), containerInfo.getHost().getHostName(),
+ containerInfo.getPort(), launchContext.getCommands());
final Cancellable cancellable = nmClient.start(containerInfo, launchContext);
launched = true;
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java
index 5267616..383e5e0 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java
@@ -20,10 +20,13 @@ package org.apache.twill.internal.appmaster;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.HashBasedTable;
+import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
import com.google.common.collect.Multiset;
import com.google.common.collect.Table;
import com.google.common.util.concurrent.FutureCallback;
@@ -53,6 +56,7 @@ import org.slf4j.LoggerFactory;
import java.util.BitSet;
import java.util.Collection;
+import java.util.Collections;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
@@ -85,20 +89,24 @@ final class RunningContainers {
// Map from runnableName to a BitSet, with the <instanceId> bit turned on for having an instance running.
private final Map<String, BitSet> runnableInstances;
+ private final Map<String, Integer> completedContainerCount;
private final DefaultResourceReport resourceReport;
private final Deque<String> startSequence;
private final Lock containerLock;
private final Condition containerChange;
private final ZKClient zkClient;
+ private final Multimap<String, ContainerInfo> containerStats;
RunningContainers(String appId, TwillRunResources appMasterResources, ZKClient zookeeperClient) {
containers = HashBasedTable.create();
runnableInstances = Maps.newHashMap();
+ completedContainerCount = Maps.newHashMap();
startSequence = Lists.newLinkedList();
containerLock = new ReentrantLock();
containerChange = containerLock.newCondition();
resourceReport = new DefaultResourceReport(appId, appMasterResources);
zkClient = zookeeperClient;
+ containerStats = HashMultimap.create();
}
/**
@@ -128,6 +136,7 @@ final class RunningContainers {
containerInfo.getHost().getHostName(),
controller);
resourceReport.addRunResources(runnableName, resources);
+ containerStats.put(runnableName, containerInfo);
if (startSequence.isEmpty() || !runnableName.equals(startSequence.peekLast())) {
startSequence.addLast(runnableName);
@@ -157,6 +166,15 @@ final class RunningContainers {
}
/**
+ * Given a runnable name, returns a list of {@link org.apache.twill.internal.ContainerInfo} for it's instances.
+ * @param runnableName name of a runnable.
+ * @return a list of {@link org.apache.twill.internal.ContainerInfo} for instances of a runnable.
+ */
+ Collection<ContainerInfo> getContainerInfo(String runnableName) {
+ return containerStats.get(runnableName);
+ }
+
+ /**
* Stops and removes the last running container of the given runnable.
*/
void removeLast(String runnableName) {
@@ -186,6 +204,7 @@ final class RunningContainers {
LOG.info("Stopping service: {} {}", runnableName, lastController.getRunId());
lastController.stopAndWait();
containers.remove(runnableName, lastContainerId);
+ removeContainerInfo(lastContainerId);
removeInstanceId(runnableName, maxInstanceId);
resourceReport.removeRunnableResources(runnableName, lastContainerId);
containerChange.signalAll();
@@ -232,6 +251,18 @@ final class RunningContainers {
}
}
+ /**
+ * Returns a Map containing number of successfully completed containers for all runnables.
+ */
+ Map<String, Integer> getCompletedContainerCount() {
+ containerLock.lock();
+ try {
+ return ImmutableMap.copyOf(completedContainerCount);
+ } finally {
+ containerLock.unlock();
+ }
+ }
+
void sendToAll(Message message, Runnable completion) {
containerLock.lock();
try {
@@ -293,6 +324,7 @@ final class RunningContainers {
}
containers.clear();
runnableInstances.clear();
+ containerStats.clear();
} finally {
containerLock.unlock();
}
@@ -320,6 +352,7 @@ final class RunningContainers {
ContainerState state = status.getState();
try {
+ removeContainerInfo(containerId);
Map<String, TwillContainerController> lookup = containers.column(containerId);
if (lookup.isEmpty()) {
// It's OK because if a container is stopped through removeLast, this would be empty.
@@ -346,6 +379,12 @@ final class RunningContainers {
TwillContainerController controller = completedEntry.getValue();
controller.completed(exitStatus);
+ if (exitStatus == ContainerExitCodes.SUCCESS) {
+ if (!completedContainerCount.containsKey(runnableName)) {
+ completedContainerCount.put(runnableName, 0);
+ }
+ completedContainerCount.put(runnableName, completedContainerCount.get(runnableName) + 1);
+ }
removeInstanceId(runnableName, getInstanceId(controller.getRunId()));
resourceReport.removeRunnableResources(runnableName, containerId);
}
@@ -456,6 +495,21 @@ final class RunningContainers {
}
/**
+ * Given the containerId, removes the corresponding containerInfo.
+ * @param containerId Id for the container to be removed.
+ * @return Returns {@code false} if container with the provided id was not found, {@code true} otherwise.
+ */
+ private boolean removeContainerInfo(String containerId) {
+ for (ContainerInfo containerInfo : this.containerStats.values()) {
+ if (containerInfo.getId().equals(containerId)) {
+ this.containerStats.values().remove(containerInfo);
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
* A helper class that overrides the debug port of the resources with the live info from the container controller.
*/
private static class DynamicTwillRunResources extends DefaultTwillRunResources {
[4/6] git commit: (TWILL-101) Schedule HDFS delegation token update
before it expires.
Posted by ch...@apache.org.
(TWILL-101) Schedule HDFS delegation token update before it expires.
Signed-off-by: Terence Yim <te...@continuuity.com>
Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/7a72ce18
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/7a72ce18
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/7a72ce18
Branch: refs/heads/site
Commit: 7a72ce18e37d94d5baf0237e24b7aaeb8ca7647f
Parents: 8cf4cd0
Author: Terence Yim <te...@continuuity.com>
Authored: Tue Sep 23 01:58:22 2014 -0700
Committer: Terence Yim <te...@continuuity.com>
Committed: Tue Sep 23 02:21:58 2014 -0700
----------------------------------------------------------------------
.../org/apache/twill/yarn/YarnTwillRunnerService.java | 10 ++++++++--
1 file changed, 8 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/7a72ce18/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java
index 4c16cc1..2634e78 100644
--- a/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java
+++ b/twill-yarn/src/main/java/org/apache/twill/yarn/YarnTwillRunnerService.java
@@ -309,8 +309,14 @@ public final class YarnTwillRunnerService extends AbstractIdleService implements
// Schedule an updater for updating HDFS delegation tokens
if (UserGroupInformation.isSecurityEnabled()) {
- long delay = yarnConfig.getLong(DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_KEY,
- DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT);
+ long renewalInterval = yarnConfig.getLong(DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_KEY,
+ DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT);
+ // Schedule it five minutes before it expires.
+ long delay = renewalInterval - TimeUnit.MINUTES.toMillis(5);
+ // Just to safeguard. In practice, the value shouldn't be that small, otherwise nothing could work.
+ if (delay <= 0) {
+ delay = (renewalInterval <= 2) ? 1 : renewalInterval / 2;
+ }
scheduleSecureStoreUpdate(new LocationSecureStoreUpdater(yarnConfig, locationFactory),
delay, delay, TimeUnit.MILLISECONDS);
}
[6/6] git commit: Merge branch 'master' into site
Posted by ch...@apache.org.
Merge branch 'master' into site
Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/128a16f3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/128a16f3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/128a16f3
Branch: refs/heads/site
Commit: 128a16f353e3e71b83ffc9faf7e96f9daa6fc8b9
Parents: 215bc0c 7f3d7b8
Author: Terence Yim <te...@cask.co>
Authored: Tue Oct 7 17:04:22 2014 -0700
Committer: Terence Yim <te...@cask.co>
Committed: Tue Oct 7 17:04:22 2014 -0700
----------------------------------------------------------------------
.../org/apache/twill/yarn/YarnTwillRunnerService.java | 10 ++++++++--
.../src/test/java/org/apache/twill/yarn/DebugTestRun.java | 8 ++++----
.../java/org/apache/twill/yarn/ResourceReportTestRun.java | 10 +++++-----
.../twill/internal/zookeeper/LeaderElectionTest.java | 6 +++---
4 files changed, 20 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
[3/6] git commit: Merge branch 'master' into site
Posted by ch...@apache.org.
Merge branch 'master' into site
Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/215bc0cf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/215bc0cf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/215bc0cf
Branch: refs/heads/site
Commit: 215bc0cf01f38a1d7f0b95fab4e525bb2c81a38e
Parents: 0004452 8cf4cd0
Author: Terence Yim <te...@continuuity.com>
Authored: Mon Sep 22 22:15:59 2014 -0700
Committer: Terence Yim <te...@continuuity.com>
Committed: Mon Sep 22 22:15:59 2014 -0700
----------------------------------------------------------------------
pom.xml | 2 +
.../main/java/org/apache/twill/api/Hosts.java | 65 +++
.../main/java/org/apache/twill/api/Racks.java | 65 +++
.../apache/twill/api/ResourceSpecification.java | 58 +--
.../apache/twill/api/TwillSpecification.java | 175 ++++++++-
.../internal/DefaultResourceSpecification.java | 25 --
.../internal/DefaultTwillSpecification.java | 86 +++-
.../org/apache/twill/internal/Constants.java | 6 +
.../json/ResourceSpecificationCodec.java | 6 +-
.../json/TwillSpecificationAdapter.java | 3 +
.../internal/json/TwillSpecificationCodec.java | 36 +-
.../json/ResourceSpecificationCodecTest.java | 19 +-
.../internal/yarn/Hadoop20YarnAMClient.java | 17 +-
.../internal/yarn/Hadoop20YarnAppClient.java | 7 +
.../internal/yarn/Hadoop21YarnAMClient.java | 28 +-
.../internal/yarn/Hadoop21YarnAppClient.java | 8 +
.../internal/yarn/Hadoop22YarnAMClient.java | 42 ++
.../appmaster/AllocationSpecification.java | 105 +++++
.../appmaster/ApplicationMasterService.java | 169 +++++++-
.../appmaster/PlacementPolicyManager.java | 104 +++++
.../internal/appmaster/ProvisionRequest.java | 11 +
.../appmaster/RunnableContainerRequest.java | 11 +-
.../appmaster/RunnableProcessLauncher.java | 5 +-
.../internal/appmaster/RunningContainers.java | 54 +++
.../internal/yarn/AbstractYarnAMClient.java | 70 +++-
.../yarn/VersionDetectYarnAMClientFactory.java | 25 +-
.../yarn/VersionDetectYarnAppClientFactory.java | 2 +-
.../twill/internal/yarn/YarnAMClient.java | 15 +
.../twill/internal/yarn/YarnAppClient.java | 10 +
.../apache/twill/internal/yarn/YarnUtils.java | 38 +-
.../apache/twill/yarn/YarnTwillPreparer.java | 4 +-
.../twill/yarn/PlacementPolicyTestRun.java | 391 +++++++++++++++++++
.../org/apache/twill/yarn/YarnTestSuite.java | 3 +-
.../org/apache/twill/yarn/YarnTestUtils.java | 23 +-
34 files changed, 1522 insertions(+), 166 deletions(-)
----------------------------------------------------------------------
[5/6] git commit: Make timeout for test cases longer to avoid failing
too much in Travis when it get scheduled on relatively slow boxes.
Posted by ch...@apache.org.
Make timeout for test cases longer to avoid failing too much in Travis when it get scheduled on relatively slow boxes.
Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/7f3d7b8b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/7f3d7b8b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/7f3d7b8b
Branch: refs/heads/site
Commit: 7f3d7b8b77211819f2f91a561b1a62ce43d510c0
Parents: 7a72ce1
Author: Terence Yim <te...@continuuity.com>
Authored: Tue Sep 23 13:27:10 2014 -0700
Committer: Terence Yim <te...@continuuity.com>
Committed: Tue Sep 23 13:27:10 2014 -0700
----------------------------------------------------------------------
.../src/test/java/org/apache/twill/yarn/DebugTestRun.java | 8 ++++----
.../java/org/apache/twill/yarn/ResourceReportTestRun.java | 10 +++++-----
.../twill/internal/zookeeper/LeaderElectionTest.java | 6 +++---
3 files changed, 12 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/7f3d7b8b/twill-yarn/src/test/java/org/apache/twill/yarn/DebugTestRun.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/DebugTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/DebugTestRun.java
index 0cb6fc8..e9f00ad 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/DebugTestRun.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/DebugTestRun.java
@@ -118,9 +118,9 @@ public class DebugTestRun extends BaseYarnTest {
}
}, Threads.SAME_THREAD_EXECUTOR);
- Assert.assertTrue(running.await(30, TimeUnit.SECONDS));
+ Assert.assertTrue(running.await(120, TimeUnit.SECONDS));
Assert.assertTrue(waitForDebugPort(controller, "r1", 30));
- controller.stop().get(30, TimeUnit.SECONDS);
+ controller.stop().get(120, TimeUnit.SECONDS);
// Sleep a bit before exiting.
TimeUnit.SECONDS.sleep(2);
}
@@ -142,10 +142,10 @@ public class DebugTestRun extends BaseYarnTest {
}
}, Threads.SAME_THREAD_EXECUTOR);
- Assert.assertTrue(running.await(30, TimeUnit.SECONDS));
+ Assert.assertTrue(running.await(120, TimeUnit.SECONDS));
Assert.assertTrue(waitForDebugPort(controller, "r1", 30));
Assert.assertTrue(waitForDebugPort(controller, "r2", 30));
- controller.stop().get(30, TimeUnit.SECONDS);
+ controller.stop().get(120, TimeUnit.SECONDS);
// Sleep a bit before exiting.
TimeUnit.SECONDS.sleep(2);
}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/7f3d7b8b/twill-yarn/src/test/java/org/apache/twill/yarn/ResourceReportTestRun.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/ResourceReportTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/ResourceReportTestRun.java
index 928a525..3a82272 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/ResourceReportTestRun.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/ResourceReportTestRun.java
@@ -103,10 +103,10 @@ public final class ResourceReportTestRun extends BaseYarnTest {
}
}, Threads.SAME_THREAD_EXECUTOR);
- Assert.assertTrue(running.await(30, TimeUnit.SECONDS));
+ Assert.assertTrue(running.await(120, TimeUnit.SECONDS));
Iterable<Discoverable> envEchoServices = controller.discoverService("envecho");
- Assert.assertTrue(YarnTestUtils.waitForSize(envEchoServices, 1, 30));
+ Assert.assertTrue(YarnTestUtils.waitForSize(envEchoServices, 1, 120));
// TODO: check virtual cores once yarn adds the ability
Map<String, String> expectedValues = Maps.newHashMap();
@@ -128,7 +128,7 @@ public final class ResourceReportTestRun extends BaseYarnTest {
}
}
- controller.stop().get(30, TimeUnit.SECONDS);
+ controller.stop().get(120, TimeUnit.SECONDS);
// Sleep a bit before exiting.
TimeUnit.SECONDS.sleep(2);
}
@@ -215,7 +215,7 @@ public final class ResourceReportTestRun extends BaseYarnTest {
}
}, Threads.SAME_THREAD_EXECUTOR);
- Assert.assertTrue(running.await(30, TimeUnit.SECONDS));
+ Assert.assertTrue(running.await(120, TimeUnit.SECONDS));
// wait for 3 echo servers to come up
Iterable<Discoverable> echoServices = controller.discoverService("echo");
@@ -280,7 +280,7 @@ public final class ResourceReportTestRun extends BaseYarnTest {
Assert.assertEquals(512, resources.getMemoryMB());
}
- controller.stop().get(30, TimeUnit.SECONDS);
+ controller.stop().get(120, TimeUnit.SECONDS);
// Sleep a bit before exiting.
TimeUnit.SECONDS.sleep(2);
}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/7f3d7b8b/twill-zookeeper/src/test/java/org/apache/twill/internal/zookeeper/LeaderElectionTest.java
----------------------------------------------------------------------
diff --git a/twill-zookeeper/src/test/java/org/apache/twill/internal/zookeeper/LeaderElectionTest.java b/twill-zookeeper/src/test/java/org/apache/twill/internal/zookeeper/LeaderElectionTest.java
index d58c11c..847b149 100644
--- a/twill-zookeeper/src/test/java/org/apache/twill/internal/zookeeper/LeaderElectionTest.java
+++ b/twill-zookeeper/src/test/java/org/apache/twill/internal/zookeeper/LeaderElectionTest.java
@@ -56,7 +56,7 @@ public class LeaderElectionTest {
private static InMemoryZKServer zkServer;
- @Test(timeout = 100000)
+ @Test(timeout = 150000)
public void testElection() throws ExecutionException, InterruptedException, BrokenBarrierException {
ExecutorService executor = Executors.newCachedThreadPool();
@@ -129,7 +129,7 @@ public class LeaderElectionTest {
}
}
- @Test(timeout = 100000)
+ @Test(timeout = 150000)
public void testCancel() throws InterruptedException, IOException {
List<LeaderElection> leaderElections = Lists.newArrayList();
List<ZKClientService> zkClients = Lists.newArrayList();
@@ -213,7 +213,7 @@ public class LeaderElectionTest {
}
}
- @Test(timeout = 100000)
+ @Test(timeout = 150000)
public void testDisconnect() throws IOException, InterruptedException {
File zkDataDir = tmpFolder.newFolder();
InMemoryZKServer ownZKServer = InMemoryZKServer.builder().setDataDir(zkDataDir).build();