You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2014/09/23 05:23:45 UTC
[2/2] git commit: TWILL-87 : Adding Container Placement Policy
control.
TWILL-87 : Adding Container Placement Policy control.
Signed-off-by: Terence Yim <ch...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/8cf4cd02
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/8cf4cd02
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/8cf4cd02
Branch: refs/heads/master
Commit: 8cf4cd02c922595c703eff4076ca74633421b301
Parents: 34af22b
Author: Gourav Khaneja <go...@gmail.com>
Authored: Sat Aug 2 00:25:02 2014 -0700
Committer: Terence Yim <ch...@apache.org>
Committed: Sat Aug 2 23:04:56 2014 -0700
----------------------------------------------------------------------
pom.xml | 2 +
.../main/java/org/apache/twill/api/Hosts.java | 65 +++
.../main/java/org/apache/twill/api/Racks.java | 65 +++
.../apache/twill/api/ResourceSpecification.java | 58 +--
.../apache/twill/api/TwillSpecification.java | 175 ++++++++-
.../internal/DefaultResourceSpecification.java | 25 --
.../internal/DefaultTwillSpecification.java | 86 +++-
.../org/apache/twill/internal/Constants.java | 6 +
.../json/ResourceSpecificationCodec.java | 6 +-
.../json/TwillSpecificationAdapter.java | 3 +
.../internal/json/TwillSpecificationCodec.java | 36 +-
.../json/ResourceSpecificationCodecTest.java | 19 +-
.../internal/yarn/Hadoop20YarnAMClient.java | 17 +-
.../internal/yarn/Hadoop20YarnAppClient.java | 7 +
.../internal/yarn/Hadoop21YarnAMClient.java | 28 +-
.../internal/yarn/Hadoop21YarnAppClient.java | 8 +
.../internal/yarn/Hadoop22YarnAMClient.java | 42 ++
.../appmaster/AllocationSpecification.java | 105 +++++
.../appmaster/ApplicationMasterService.java | 169 +++++++-
.../appmaster/PlacementPolicyManager.java | 104 +++++
.../internal/appmaster/ProvisionRequest.java | 11 +
.../appmaster/RunnableContainerRequest.java | 11 +-
.../appmaster/RunnableProcessLauncher.java | 5 +-
.../internal/appmaster/RunningContainers.java | 54 +++
.../internal/yarn/AbstractYarnAMClient.java | 70 +++-
.../yarn/VersionDetectYarnAMClientFactory.java | 25 +-
.../yarn/VersionDetectYarnAppClientFactory.java | 2 +-
.../twill/internal/yarn/YarnAMClient.java | 15 +
.../twill/internal/yarn/YarnAppClient.java | 10 +
.../apache/twill/internal/yarn/YarnUtils.java | 38 +-
.../apache/twill/yarn/YarnTwillPreparer.java | 4 +-
.../twill/yarn/PlacementPolicyTestRun.java | 391 +++++++++++++++++++
.../org/apache/twill/yarn/YarnTestSuite.java | 3 +-
.../org/apache/twill/yarn/YarnTestUtils.java | 23 +-
34 files changed, 1522 insertions(+), 166 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index dfb7394..67ab4fc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -508,6 +508,7 @@
<configuration>
<sources>
<source>src/main/hadoop21</source>
+ <source>src/main/hadoop22</source>
</sources>
</configuration>
</execution>
@@ -552,6 +553,7 @@
<configuration>
<sources>
<source>src/main/hadoop21</source>
+ <source>src/main/hadoop22</source>
</sources>
</configuration>
</execution>
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-api/src/main/java/org/apache/twill/api/Hosts.java
----------------------------------------------------------------------
diff --git a/twill-api/src/main/java/org/apache/twill/api/Hosts.java b/twill-api/src/main/java/org/apache/twill/api/Hosts.java
new file mode 100644
index 0000000..90a27ad
--- /dev/null
+++ b/twill-api/src/main/java/org/apache/twill/api/Hosts.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.api;
+
+import com.google.common.collect.ImmutableSet;
+
+import java.util.Arrays;
+import java.util.Set;
+
+/**
+ * Represents a list of hosts.
+ */
+
+public class Hosts {
+ private final Set<String> hosts;
+
+ public Hosts(Set<String> hosts) {
+ this.hosts = ImmutableSet.copyOf(hosts);
+ }
+
+ public Hosts(String host, String...moreHosts) {
+ this.hosts = ImmutableSet.<String>builder()
+ .add(host)
+ .addAll(Arrays.asList(moreHosts))
+ .build();
+ }
+
+ /**
+ * Convenience method to create an instance of {@link org.apache.twill.api.Hosts}.
+ * @param host A host to be added.
+ * @param moreHosts A list of hosts to be added.
+ * @return An instance of {@link org.apache.twill.api.Hosts} containing specified hosts.
+ */
+ public static Hosts of(String host, String...moreHosts) {
+ return new Hosts(host, moreHosts);
+ }
+
+ /**
+ * Get the list of hosts.
+ * @return list of hosts.
+ */
+ public Set<String> get() {
+ return this.hosts;
+ }
+
+ @Override
+ public String toString() {
+ return this.hosts.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-api/src/main/java/org/apache/twill/api/Racks.java
----------------------------------------------------------------------
diff --git a/twill-api/src/main/java/org/apache/twill/api/Racks.java b/twill-api/src/main/java/org/apache/twill/api/Racks.java
new file mode 100644
index 0000000..04eedd0
--- /dev/null
+++ b/twill-api/src/main/java/org/apache/twill/api/Racks.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.api;
+
+import com.google.common.collect.ImmutableSet;
+
+import java.util.Arrays;
+import java.util.Set;
+
+/**
+ * Represents a list of Racks.
+ */
+
+public class Racks {
+ private final Set<String> racks;
+
+ public Racks(Set<String> racks) {
+ this.racks = ImmutableSet.copyOf(racks);
+ }
+
+ public Racks(String rack, String...moreRacks) {
+ this.racks = ImmutableSet.<String>builder()
+ .add(rack)
+ .addAll(Arrays.asList(moreRacks))
+ .build();
+ }
+
+ /**
+ * Convenience method to create an instance of {@link org.apache.twill.api.Racks}.
+ * @param rack A rack to be added.
+ * @param moreRacks A list of racks to be added.
+ * @return An instance of {@link org.apache.twill.api.Racks} containing specified racks.
+ */
+ public static Racks of(String rack, String...moreRacks) {
+ return new Racks(rack, moreRacks);
+ }
+
+ /**
+ * Get the list of racks.
+ * @return list of racks.
+ */
+ public Set<String> get() {
+ return this.racks;
+ }
+
+ @Override
+ public String toString() {
+ return this.racks.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-api/src/main/java/org/apache/twill/api/ResourceSpecification.java
----------------------------------------------------------------------
diff --git a/twill-api/src/main/java/org/apache/twill/api/ResourceSpecification.java b/twill-api/src/main/java/org/apache/twill/api/ResourceSpecification.java
index 4b602bc..2865caf 100644
--- a/twill-api/src/main/java/org/apache/twill/api/ResourceSpecification.java
+++ b/twill-api/src/main/java/org/apache/twill/api/ResourceSpecification.java
@@ -17,14 +17,8 @@
*/
package org.apache.twill.api;
-import com.google.common.collect.Iterables;
import org.apache.twill.internal.DefaultResourceSpecification;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.LinkedList;
-import java.util.List;
-
/**
* This interface provides specifications for resource requirements including set and get methods
* for number of cores, amount of memory, and number of instances.
@@ -85,20 +79,6 @@ public interface ResourceSpecification {
int getInstances();
/**
- * Returns the execution hosts, expects Fully Qualified Domain Names host + domain.
- * This is a suggestion for the scheduler depending on cluster load it may ignore it
- * @return An array containing the hosts where the containers should run
- */
- List<String> getHosts();
-
- /**
- * Returns the execution racks.
- * This is a suggestion for the scheduler depending on cluster load it may ignore it
- * @return An array containing the racks where the containers should run
- */
- List<String> getRacks();
-
- /**
* Builder for creating {@link ResourceSpecification}.
*/
static final class Builder {
@@ -108,8 +88,6 @@ public interface ResourceSpecification {
private int uplink = -1;
private int downlink = -1;
private int instances = 1;
- private List<String> hosts = new LinkedList<String>();
- private List<String> racks = new LinkedList<String>();
public static CoreSetter with() {
return new Builder().new CoreSetter();
@@ -150,42 +128,10 @@ public interface ResourceSpecification {
}
public final class AfterUplink extends Build {
- public AfterDownlink setDownlink(int downlink, SizeUnit unit) {
+ public Done setDownlink(int downlink, SizeUnit unit) {
Builder.this.downlink = downlink * unit.multiplier;
- return new AfterDownlink();
- }
- }
-
- public final class AfterHosts extends Build {
- public Done setRacks(String... racks) {
- if (racks != null) {
- Builder.this.racks = Arrays.asList(racks);
- }
return new Done();
}
-
- public Done setRacks(Iterable<String> racks) {
- if (racks != null) {
- Iterables.addAll(Builder.this.racks, racks);
- }
- return new Done();
- }
- }
-
- public final class AfterDownlink extends Build {
- public AfterHosts setHosts(String... hosts) {
- if (hosts != null) {
- Builder.this.hosts = Arrays.asList(hosts);
- }
- return new AfterHosts();
- }
-
- public AfterHosts setHosts(Iterable<String> hosts) {
- if (hosts != null) {
- Iterables.addAll(Builder.this.hosts, hosts);
- }
- return new AfterHosts();
- }
}
public final class Done extends Build {
@@ -193,7 +139,7 @@ public interface ResourceSpecification {
public abstract class Build {
public ResourceSpecification build() {
- return new DefaultResourceSpecification(cores, memory, instances, uplink, downlink, hosts, racks);
+ return new DefaultResourceSpecification(cores, memory, instances, uplink, downlink);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-api/src/main/java/org/apache/twill/api/TwillSpecification.java
----------------------------------------------------------------------
diff --git a/twill-api/src/main/java/org/apache/twill/api/TwillSpecification.java b/twill-api/src/main/java/org/apache/twill/api/TwillSpecification.java
index 3931d04..ed37190 100644
--- a/twill-api/src/main/java/org/apache/twill/api/TwillSpecification.java
+++ b/twill-api/src/main/java/org/apache/twill/api/TwillSpecification.java
@@ -60,6 +60,46 @@ public interface TwillSpecification {
}
/**
+ * Defines a container placement policy.
+ */
+ interface PlacementPolicy {
+
+ /**
+ * Lists different types of Placement Policies available.
+ */
+ enum Type {
+ /**
+ * Runnables should be scattered over different hosts.
+ */
+ DISTRIBUTED,
+ /**
+ * No specific placement policy.
+ */
+ DEFAULT
+ }
+
+ /**
+ * @return Set of {@link org.apache.twill.api.TwillRunnable} names that belongs to this placement policy.
+ */
+ Set<String> getNames();
+
+ /**
+ * @return {@link org.apache.twill.api.TwillSpecification.PlacementPolicy.Type Type} of this placement policy.
+ */
+ Type getType();
+
+ /**
+ * @return Set of hosts associated with this placement policy.
+ */
+ Set<String> getHosts();
+
+ /**
+ * @return Set of racks associated with this placement policy.
+ */
+ Set<String> getRacks();
+ }
+
+ /**
* @return Name of the application.
*/
String getName();
@@ -75,6 +115,11 @@ public interface TwillSpecification {
List<Order> getOrders();
/**
+ * @return Returns all {@link org.apache.twill.api.TwillSpecification.PlacementPolicy} for this application.
+ */
+ List<PlacementPolicy> getPlacementPolicies();
+
+ /**
* @return The {@link EventHandlerSpecification} for the {@link EventHandler} to be used for this application,
* or {@code null} if no event handler has been provided.
*/
@@ -89,6 +134,7 @@ public interface TwillSpecification {
private String name;
private Map<String, RuntimeSpecification> runnables = Maps.newHashMap();
private List<Order> orders = Lists.newArrayList();
+ private List<PlacementPolicy> placementPolicies = Lists.newArrayList();
private EventHandlerSpecification eventHandler;
public static NameSetter with() {
@@ -128,6 +174,8 @@ public interface TwillSpecification {
FirstOrder withOrder();
AfterOrder anyOrder();
+
+ PlacementPolicySetter withPlacementPolicy();
}
public final class RunnableSetter implements MoreRunnable, AfterRunnable {
@@ -170,6 +218,11 @@ public interface TwillSpecification {
public AfterOrder anyOrder() {
return new OrderSetter();
}
+
+ @Override
+ public PlacementPolicySetter withPlacementPolicy() {
+ return new PlacementPolicySetter();
+ }
}
/**
@@ -252,6 +305,125 @@ public interface TwillSpecification {
}
}
+ /**
+ * Interface to add placement policies to the application.
+ */
+ public interface MorePlacementPolicies {
+
+ /**
+ * Specify hosts for a list of runnables.
+ * @param hosts {@link org.apache.twill.api.Hosts} specifying a set of hosts.
+ * @param runnableName a runnable name.
+ * @param runnableNames a list of runnable names.
+ * @return A reference to either add more placement policies or skip to defining execution order.
+ */
+ PlacementPolicySetter add(Hosts hosts, String runnableName, String... runnableNames);
+
+ /**
+ * Specify racks for a list of runnables.
+ * @param racks {@link org.apache.twill.api.Racks} specifying a set of racks.
+ * @param runnableName a runnable name.
+ * @param runnableNames a list of runnable names.
+ * @return A reference to either add more placement policies or skip to defining execution order.
+ */
+ PlacementPolicySetter add(Racks racks, String runnableName, String... runnableNames);
+
+ /**
+ * Specify hosts and racks for a list of runnables.
+ * @param hosts {@link org.apache.twill.api.Hosts} specifying a set of hosts.
+ * @param racks {@link org.apache.twill.api.Racks} specifying a set of racks.
+ * @param runnableName a runnable name.
+ * @param runnableNames a list of runnable names.
+ * @return A reference to either add more placement policies or skip to defining execution order.
+ */
+ PlacementPolicySetter add(Hosts hosts, Racks racks, String runnableName, String... runnableNames);
+
+ /**
+ * Specify a placement policy for a list of runnables.
+ * @param type {@link PlacementPolicy.Type} specifying a specific placement policy type.
+ * @param runnableName a runnable name.
+ * @param runnableNames a list of runnable names.
+ * @return A reference to either add more placement policies or skip to defining execution order.
+ */
+ PlacementPolicySetter add(PlacementPolicy.Type type, String runnableName, String... runnableNames);
+ }
+
+ /**
+ * Interface to define execution order after adding placement policies.
+ */
+ public interface AfterPlacementPolicy {
+ /**
+ * Start defining execution order.
+ */
+ FirstOrder withOrder();
+
+ /**
+ * No particular execution order is needed.
+ */
+ AfterOrder anyOrder();
+ }
+
+ public final class PlacementPolicySetter implements MorePlacementPolicies, AfterPlacementPolicy {
+
+ @Override
+ public PlacementPolicySetter add(Hosts hosts, String runnableName, String... runnableNames) {
+ return addPlacementPolicy(PlacementPolicy.Type.DEFAULT, hosts, null, runnableName, runnableNames);
+ }
+
+ @Override
+ public PlacementPolicySetter add(Racks racks, String runnableName, String... runnableNames) {
+ return addPlacementPolicy(PlacementPolicy.Type.DEFAULT, null, racks, runnableName, runnableNames);
+ }
+
+ @Override
+ public PlacementPolicySetter add(Hosts hosts, Racks racks, String runnableName, String... runnableNames) {
+ return addPlacementPolicy(PlacementPolicy.Type.DEFAULT, hosts, racks, runnableName, runnableNames);
+ }
+
+ @Override
+ public PlacementPolicySetter add(PlacementPolicy.Type type, String runnableName, String...runnableNames) {
+ return addPlacementPolicy(type, null, null, runnableName, runnableNames);
+ }
+
+ private PlacementPolicySetter addPlacementPolicy(PlacementPolicy.Type type, Hosts hosts, Racks racks,
+ String runnableName, String...runnableNames) {
+ Preconditions.checkArgument(runnableName != null, "Name cannot be null.");
+ Preconditions.checkArgument(runnables.containsKey(runnableName), "Runnable not exists.");
+ Preconditions.checkArgument(!contains(runnableName),
+ "Runnable (" + runnableName + ") cannot belong to more than one Placement Policy");
+ Set<String> runnableNamesSet = Sets.newHashSet(runnableName);
+ for (String name : runnableNames) {
+ Preconditions.checkArgument(name != null, "Name cannot be null.");
+ Preconditions.checkArgument(runnables.containsKey(name), "Runnable not exists.");
+ Preconditions.checkArgument(!contains(name),
+ "Runnable (" + name + ") cannot belong to more than one Placement Policy");
+ runnableNamesSet.add(name);
+ }
+ placementPolicies.add(
+ new DefaultTwillSpecification.DefaultPlacementPolicy(runnableNamesSet, type, hosts, racks));
+ return this;
+ }
+
+ private boolean contains(String runnableName) {
+ for (PlacementPolicy placementPolicy : placementPolicies) {
+ if (placementPolicy.getNames().contains(runnableName)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public FirstOrder withOrder() {
+ return new OrderSetter();
+ }
+
+ @Override
+ public AfterOrder anyOrder() {
+ return new OrderSetter();
+ }
+ }
+
public interface FirstOrder {
NextOrder begin(String name, String...names);
}
@@ -303,8 +475,7 @@ public interface TwillSpecification {
// For all unordered runnables, add it to the end of orders list
orders.add(new DefaultTwillSpecification.DefaultOrder(runnableNames, Order.Type.STARTED));
-
- return new DefaultTwillSpecification(name, runnables, orders, eventHandler);
+ return new DefaultTwillSpecification(name, runnables, orders, placementPolicies, eventHandler);
}
private void addOrder(final Order.Type type, String name, String...names) {
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-api/src/main/java/org/apache/twill/internal/DefaultResourceSpecification.java
----------------------------------------------------------------------
diff --git a/twill-api/src/main/java/org/apache/twill/internal/DefaultResourceSpecification.java b/twill-api/src/main/java/org/apache/twill/internal/DefaultResourceSpecification.java
index 2998165..1327ce5 100644
--- a/twill-api/src/main/java/org/apache/twill/internal/DefaultResourceSpecification.java
+++ b/twill-api/src/main/java/org/apache/twill/internal/DefaultResourceSpecification.java
@@ -17,12 +17,8 @@
*/
package org.apache.twill.internal;
-import com.google.common.collect.ImmutableList;
import org.apache.twill.api.ResourceSpecification;
-import java.util.Collections;
-import java.util.List;
-
/**
* Straightforward implementation of {@link org.apache.twill.api.ResourceSpecification}.
*/
@@ -32,24 +28,13 @@ public final class DefaultResourceSpecification implements ResourceSpecification
private final int instances;
private final int uplink;
private final int downlink;
- private final List<String> hosts;
- private final List<String> racks;
public DefaultResourceSpecification(int virtualCores, int memorySize, int instances, int uplink, int downlink) {
- this(virtualCores, memorySize, instances, uplink, downlink,
- Collections.<String>emptyList(), Collections.<String>emptyList());
- }
-
- public DefaultResourceSpecification(int virtualCores, int memorySize, int instances,
- int uplink, int downlink,
- List<String> hosts, List<String> racks) {
this.virtualCores = virtualCores;
this.memorySize = memorySize;
this.instances = instances;
this.uplink = uplink;
this.downlink = downlink;
- this.hosts = ImmutableList.copyOf(hosts);
- this.racks = ImmutableList.copyOf(racks);
}
@Deprecated
@@ -74,16 +59,6 @@ public final class DefaultResourceSpecification implements ResourceSpecification
}
@Override
- public List<String> getHosts() {
- return this.hosts;
- }
-
- @Override
- public List<String> getRacks() {
- return this.racks;
- }
-
- @Override
public int getUplink() {
return uplink;
}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-api/src/main/java/org/apache/twill/internal/DefaultTwillSpecification.java
----------------------------------------------------------------------
diff --git a/twill-api/src/main/java/org/apache/twill/internal/DefaultTwillSpecification.java b/twill-api/src/main/java/org/apache/twill/internal/DefaultTwillSpecification.java
index fdb8b32..184afb3 100644
--- a/twill-api/src/main/java/org/apache/twill/internal/DefaultTwillSpecification.java
+++ b/twill-api/src/main/java/org/apache/twill/internal/DefaultTwillSpecification.java
@@ -22,9 +22,12 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.twill.api.EventHandlerSpecification;
+import org.apache.twill.api.Hosts;
+import org.apache.twill.api.Racks;
import org.apache.twill.api.RuntimeSpecification;
import org.apache.twill.api.TwillSpecification;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -38,13 +41,16 @@ public final class DefaultTwillSpecification implements TwillSpecification {
private final String name;
private final Map<String, RuntimeSpecification> runnables;
private final List<Order> orders;
+ private final List<PlacementPolicy> placementPolicies;
private final EventHandlerSpecification eventHandler;
public DefaultTwillSpecification(String name, Map<String, RuntimeSpecification> runnables,
- List<Order> orders, EventHandlerSpecification eventHandler) {
+ List<Order> orders, List<PlacementPolicy> placementPolicies,
+ EventHandlerSpecification eventHandler) {
this.name = name;
this.runnables = ImmutableMap.copyOf(runnables);
this.orders = ImmutableList.copyOf(orders);
+ this.placementPolicies = placementPolicies;
this.eventHandler = eventHandler;
}
@@ -63,6 +69,11 @@ public final class DefaultTwillSpecification implements TwillSpecification {
return orders;
}
+ @Override
+ public List<PlacementPolicy> getPlacementPolicies() {
+ return placementPolicies;
+ }
+
@Nullable
@Override
public EventHandlerSpecification getEventHandler() {
@@ -100,4 +111,77 @@ public final class DefaultTwillSpecification implements TwillSpecification {
.toString();
}
}
+
+ /**
+ * Straightforward implementation of {@link org.apache.twill.api.TwillSpecification.PlacementPolicy}.
+ */
+ public static final class DefaultPlacementPolicy implements PlacementPolicy {
+
+ private final Set<String> names;
+ private final Type type;
+ private final Hosts hosts;
+ private final Racks racks;
+
+ public DefaultPlacementPolicy(Iterable<String> names, Type type, Hosts hosts, Racks racks) {
+ this.names = ImmutableSet.copyOf(names);
+ this.type = type;
+ this.hosts = hosts;
+ this.racks = racks;
+ }
+
+ public DefaultPlacementPolicy(Iterable<String> names, Type type) {
+ this(names, type, null, null);
+ }
+
+ /**
+ * @return Set of {@link org.apache.twill.api.TwillRunnable} names that belongs to this placement policy.
+ */
+ @Override
+ public Set<String> getNames() {
+ return names;
+ }
+
+ /**
+ * @return {@link org.apache.twill.api.TwillSpecification.PlacementPolicy.Type Type} of this placement policy.
+ */
+ @Override
+ public Type getType() {
+ return type;
+ }
+
+ /**
+ * @return Set of hosts associated with this placement policy.
+ */
+ @Override
+ public Set<String> getHosts() {
+ if (this.hosts == null) {
+ return Collections.emptySet();
+ }
+ return this.hosts.get();
+ }
+
+ /**
+ * @return Set of racks associated with this placement policy.
+ */
+ @Override
+ public Set<String> getRacks() {
+ if (this.racks == null) {
+ return Collections.emptySet();
+ }
+ return this.racks.get();
+ }
+
+ /**
+ * @return String representation of Placement Policy
+ */
+ @Override
+ public String toString() {
+ return Objects.toStringHelper(this)
+ .add("names", names)
+ .add("type", type)
+ .add("hosts", hosts)
+ .add("racks", racks)
+ .toString();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-core/src/main/java/org/apache/twill/internal/Constants.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/Constants.java b/twill-core/src/main/java/org/apache/twill/internal/Constants.java
index 9912638..fbc6e70 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/Constants.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/Constants.java
@@ -31,6 +31,12 @@ public final class Constants {
public static final long PROVISION_TIMEOUT = 30000;
+ /**
+ * Milliseconds AM should wait for RM to allocate a constrained provision request.
+ * On timeout, AM relaxes the request constraints.
+ */
+ public static final int CONSTRAINED_PROVISION_REQUEST_TIMEOUT = 5000;
+
/** Memory size of AM. */
public static final int APP_MASTER_MEMORY_MB = 512;
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-core/src/main/java/org/apache/twill/internal/json/ResourceSpecificationCodec.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/json/ResourceSpecificationCodec.java b/twill-core/src/main/java/org/apache/twill/internal/json/ResourceSpecificationCodec.java
index 5f7d7ae..412e406 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/json/ResourceSpecificationCodec.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/json/ResourceSpecificationCodec.java
@@ -44,8 +44,6 @@ final class ResourceSpecificationCodec implements JsonSerializer<ResourceSpecifi
json.addProperty("instances", src.getInstances());
json.addProperty("uplink", src.getUplink());
json.addProperty("downlink", src.getDownlink());
- json.add("hosts", context.serialize(src.getHosts()));
- json.add("racks", context.serialize(src.getRacks()));
return json;
}
@@ -59,9 +57,7 @@ final class ResourceSpecificationCodec implements JsonSerializer<ResourceSpecifi
jsonObj.get("memorySize").getAsInt(),
jsonObj.get("instances").getAsInt(),
jsonObj.get("uplink").getAsInt(),
- jsonObj.get("downlink").getAsInt(),
- hosts,
- racks);
+ jsonObj.get("downlink").getAsInt());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-core/src/main/java/org/apache/twill/internal/json/TwillSpecificationAdapter.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/json/TwillSpecificationAdapter.java b/twill-core/src/main/java/org/apache/twill/internal/json/TwillSpecificationAdapter.java
index 67c15a2..d882096 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/json/TwillSpecificationAdapter.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/json/TwillSpecificationAdapter.java
@@ -36,6 +36,7 @@ import org.apache.twill.api.TwillRunnableSpecification;
import org.apache.twill.api.TwillSpecification;
import org.apache.twill.internal.json.TwillSpecificationCodec.EventHandlerSpecificationCoder;
import org.apache.twill.internal.json.TwillSpecificationCodec.TwillSpecificationOrderCoder;
+import org.apache.twill.internal.json.TwillSpecificationCodec.TwillSpecificationPlacementPolicyCoder;
import java.io.File;
import java.io.IOException;
@@ -61,6 +62,8 @@ public final class TwillSpecificationAdapter {
.serializeNulls()
.registerTypeAdapter(TwillSpecification.class, new TwillSpecificationCodec())
.registerTypeAdapter(TwillSpecification.Order.class, new TwillSpecificationOrderCoder())
+ .registerTypeAdapter(TwillSpecification.PlacementPolicy.class,
+ new TwillSpecificationPlacementPolicyCoder())
.registerTypeAdapter(EventHandlerSpecification.class, new EventHandlerSpecificationCoder())
.registerTypeAdapter(RuntimeSpecification.class, new RuntimeSpecificationCodec())
.registerTypeAdapter(TwillRunnableSpecification.class, new TwillRunnableSpecificationCodec())
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-core/src/main/java/org/apache/twill/internal/json/TwillSpecificationCodec.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/json/TwillSpecificationCodec.java b/twill-core/src/main/java/org/apache/twill/internal/json/TwillSpecificationCodec.java
index d46434a..96df950 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/json/TwillSpecificationCodec.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/json/TwillSpecificationCodec.java
@@ -26,6 +26,8 @@ import com.google.gson.JsonParseException;
import com.google.gson.JsonSerializationContext;
import com.google.gson.JsonSerializer;
import org.apache.twill.api.EventHandlerSpecification;
+import org.apache.twill.api.Hosts;
+import org.apache.twill.api.Racks;
import org.apache.twill.api.RuntimeSpecification;
import org.apache.twill.api.TwillSpecification;
import org.apache.twill.internal.DefaultEventHandlerSpecification;
@@ -50,6 +52,8 @@ final class TwillSpecificationCodec implements JsonSerializer<TwillSpecification
new TypeToken<Map<String, RuntimeSpecification>>() { }.getType()));
json.add("orders", context.serialize(src.getOrders(),
new TypeToken<List<TwillSpecification.Order>>() { }.getType()));
+ json.add("placementPolicies", context.serialize(
+ src.getPlacementPolicies(), new TypeToken<List<TwillSpecification.PlacementPolicy>>() { }.getType()));
EventHandlerSpecification eventHandler = src.getEventHandler();
if (eventHandler != null) {
json.add("handler", context.serialize(eventHandler, EventHandlerSpecification.class));
@@ -68,6 +72,8 @@ final class TwillSpecificationCodec implements JsonSerializer<TwillSpecification
jsonObj.get("runnables"), new TypeToken<Map<String, RuntimeSpecification>>() { }.getType());
List<TwillSpecification.Order> orders = context.deserialize(
jsonObj.get("orders"), new TypeToken<List<TwillSpecification.Order>>() { }.getType());
+ List<TwillSpecification.PlacementPolicy> placementPolicies = context.deserialize(
+ jsonObj.get("placementPolicies"), new TypeToken<List<TwillSpecification.PlacementPolicy>>() { }.getType());
JsonElement handler = jsonObj.get("handler");
EventHandlerSpecification eventHandler = null;
@@ -75,7 +81,7 @@ final class TwillSpecificationCodec implements JsonSerializer<TwillSpecification
eventHandler = context.deserialize(handler, EventHandlerSpecification.class);
}
- return new DefaultTwillSpecification(name, runnables, orders, eventHandler);
+ return new DefaultTwillSpecification(name, runnables, orders, placementPolicies, eventHandler);
}
static final class TwillSpecificationOrderCoder implements JsonSerializer<TwillSpecification.Order>,
@@ -101,6 +107,34 @@ final class TwillSpecificationCodec implements JsonSerializer<TwillSpecification
}
}
+ static final class TwillSpecificationPlacementPolicyCoder implements
+ JsonSerializer<TwillSpecification.PlacementPolicy>, JsonDeserializer<TwillSpecification.PlacementPolicy> {
+
+ @Override
+ public JsonElement serialize(TwillSpecification.PlacementPolicy src, Type typeOfSrc,
+ JsonSerializationContext context) {
+ JsonObject json = new JsonObject();
+ json.add("names", context.serialize(src.getNames(), new TypeToken<Set<String>>() { }.getType()));
+ json.addProperty("type", src.getType().name());
+ json.add("hosts", context.serialize(src.getHosts(), new TypeToken<Set<String>>() { }.getType()));
+ json.add("racks", context.serialize(src.getRacks(), new TypeToken<Set<String>>() { }.getType()));
+ return json;
+ }
+
+ @Override
+ public TwillSpecification.PlacementPolicy deserialize(JsonElement json, Type typeOfT,
+ JsonDeserializationContext context)
+ throws JsonParseException {
+ JsonObject jsonObj = json.getAsJsonObject();
+ Set<String> names = context.deserialize(jsonObj.get("names"), new TypeToken<Set<String>>() { }.getType());
+ TwillSpecification.PlacementPolicy.Type type =
+ TwillSpecification.PlacementPolicy.Type.valueOf(jsonObj.get("type").getAsString());
+ Set<String> hosts = context.deserialize(jsonObj.get("hosts"), new TypeToken<Set<String>>() { }.getType());
+ Set<String> racks = context.deserialize(jsonObj.get("racks"), new TypeToken<Set<String>>() { }.getType());
+ return new DefaultTwillSpecification.DefaultPlacementPolicy(names, type, new Hosts(hosts), new Racks(racks));
+ }
+ }
+
static final class EventHandlerSpecificationCoder implements JsonSerializer<EventHandlerSpecification>,
JsonDeserializer<EventHandlerSpecification> {
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-core/src/test/java/org/apache/twill/internal/json/ResourceSpecificationCodecTest.java
----------------------------------------------------------------------
diff --git a/twill-core/src/test/java/org/apache/twill/internal/json/ResourceSpecificationCodecTest.java b/twill-core/src/test/java/org/apache/twill/internal/json/ResourceSpecificationCodecTest.java
index e8b0eff..2e949d4 100644
--- a/twill-core/src/test/java/org/apache/twill/internal/json/ResourceSpecificationCodecTest.java
+++ b/twill-core/src/test/java/org/apache/twill/internal/json/ResourceSpecificationCodecTest.java
@@ -26,8 +26,6 @@ import org.junit.Assert;
import org.junit.Test;
import org.unitils.reflectionassert.ReflectionAssert;
-import java.util.Arrays;
-
/**
* Maybe this checkstyle rule needs to be removed.
*/
@@ -45,13 +43,10 @@ public class ResourceSpecificationCodecTest {
"\"memorySize\":1024," +
"\"instances\":2," +
"\"uplink\":100," +
- "\"downlink\":100," +
- "\"hosts\":[\"one1\",\"two2\"]," +
- "\"racks\":[\"three3\"]" +
+ "\"downlink\":100" +
"}";
final ResourceSpecification expected =
- new DefaultResourceSpecification(2, 1024, 2, 100, 100,
- Arrays.asList("one1", "two2"), Arrays.asList("three3"));
+ new DefaultResourceSpecification(2, 1024, 2, 100, 100);
final String actualString = gson.toJson(expected);
Assert.assertEquals(expectedString, actualString);
@@ -71,12 +66,9 @@ public class ResourceSpecificationCodecTest {
.setInstances(3)
.setUplink(10, ResourceSpecification.SizeUnit.GIGA)
.setDownlink(5, ResourceSpecification.SizeUnit.GIGA)
- .setHosts("a1", "b2", "c3")
- .setRacks("r2")
.build();
final DefaultResourceSpecification expected =
- new DefaultResourceSpecification(5, 4096, 3, 10240, 5120,
- Arrays.asList("a1", "b2", "c3"), Arrays.asList("r2"));
+ new DefaultResourceSpecification(5, 4096, 3, 10240, 5120);
ReflectionAssert.assertLenientEquals(expected, actual);
}
@@ -88,12 +80,9 @@ public class ResourceSpecificationCodecTest {
.setInstances(3)
.setUplink(10, ResourceSpecification.SizeUnit.GIGA)
.setDownlink(5, ResourceSpecification.SizeUnit.GIGA)
- .setHosts(Arrays.asList("a1", "b2", "c3"))
- .setRacks(Arrays.asList("r2"))
.build();
final DefaultResourceSpecification expected =
- new DefaultResourceSpecification(5, 4096, 3, 10240, 5120,
- Arrays.asList("a1", "b2", "c3"), Arrays.asList("r2"));
+ new DefaultResourceSpecification(5, 4096, 3, 10240, 5120);
ReflectionAssert.assertLenientEquals(expected, actual);
}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAMClient.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAMClient.java b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAMClient.java
index 9b66f67..a990cc0 100644
--- a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAMClient.java
+++ b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAMClient.java
@@ -100,6 +100,11 @@ public final class Hadoop20YarnAMClient extends AbstractYarnAMClient<AMRMClient.
}
@Override
+ public int getNMPort() {
+ return Integer.parseInt(System.getenv().get(ApplicationConstants.NM_PORT_ENV));
+ }
+
+ @Override
protected Resource adjustCapability(Resource resource) {
int cores = YarnUtils.getVirtualCores(resource);
int updatedCores = Math.max(Math.min(cores, YarnUtils.getVirtualCores(maxCapability)),
@@ -123,7 +128,9 @@ public final class Hadoop20YarnAMClient extends AbstractYarnAMClient<AMRMClient.
@Override
protected AMRMClient.ContainerRequest createContainerRequest(Priority priority, Resource capability,
- @Nullable String[] hosts, @Nullable String[] racks) {
+ @Nullable String[] hosts, @Nullable String[] racks,
+ boolean relaxLocality) {
+ // Ignore relaxLocality param since the corresponding support is not present in Hadoop 2.0.
return new AMRMClient.ContainerRequest(capability, hosts, racks, priority, 1);
}
@@ -138,6 +145,14 @@ public final class Hadoop20YarnAMClient extends AbstractYarnAMClient<AMRMClient.
}
@Override
+ protected void updateBlacklist(List<String> blacklistAdditions, List<String> blacklistRemovals) {
+ // An empty implementation since Blacklist is not supported in Hadoop-2.0.
+ if (recordUnsupportedFeature("blacklist")) {
+ LOG.warn("Blacklist is not supported in Hadoop 2.0");
+ }
+ }
+
+ @Override
protected AllocateResult doAllocate(float progress) throws Exception {
AllocationResponse response = amrmClient.allocate(progress);
List<RunnableProcessLauncher> launchers
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAppClient.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAppClient.java b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAppClient.java
index 6fdd99e..3f7422d 100644
--- a/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAppClient.java
+++ b/twill-yarn/src/main/hadoop20/org/apache/twill/internal/yarn/Hadoop20YarnAppClient.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.DelegationToken;
+import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.YarnClient;
import org.apache.hadoop.yarn.client.YarnClientImpl;
@@ -45,6 +46,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
+import java.util.List;
/**
*
@@ -156,6 +158,11 @@ public final class Hadoop20YarnAppClient extends AbstractIdleService implements
}
@Override
+ public List<NodeReport> getNodeReports() throws Exception {
+ return this.yarnClient.getNodeReports();
+ }
+
+ @Override
protected void startUp() throws Exception {
yarnClient.start();
}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAMClient.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAMClient.java b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAMClient.java
index 78a5135..c5ac458 100644
--- a/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAMClient.java
+++ b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAMClient.java
@@ -42,11 +42,11 @@ import javax.annotation.Nullable;
/**
*
*/
-public final class Hadoop21YarnAMClient extends AbstractYarnAMClient<AMRMClient.ContainerRequest> {
+public class Hadoop21YarnAMClient extends AbstractYarnAMClient<AMRMClient.ContainerRequest> {
private static final Logger LOG = LoggerFactory.getLogger(Hadoop21YarnAMClient.class);
- private static final Function<ContainerStatus, YarnContainerStatus> STATUS_TRANSFORM;
+ protected static final Function<ContainerStatus, YarnContainerStatus> STATUS_TRANSFORM;
static {
STATUS_TRANSFORM = new Function<ContainerStatus, YarnContainerStatus>() {
@@ -57,9 +57,9 @@ public final class Hadoop21YarnAMClient extends AbstractYarnAMClient<AMRMClient.
};
}
- private final AMRMClient<AMRMClient.ContainerRequest> amrmClient;
- private final Hadoop21YarnNMClient nmClient;
- private Resource maxCapability;
+ protected final AMRMClient<AMRMClient.ContainerRequest> amrmClient;
+ protected final Hadoop21YarnNMClient nmClient;
+ protected Resource maxCapability;
public Hadoop21YarnAMClient(Configuration conf) {
super(ApplicationConstants.Environment.CONTAINER_ID.name());
@@ -95,9 +95,15 @@ public final class Hadoop21YarnAMClient extends AbstractYarnAMClient<AMRMClient.
}
@Override
+ public int getNMPort() {
+ return Integer.parseInt(System.getenv().get(ApplicationConstants.Environment.NM_PORT.name()));
+ }
+
+ @Override
protected AMRMClient.ContainerRequest createContainerRequest(Priority priority, Resource capability,
- @Nullable String[] hosts, @Nullable String[] racks) {
- return new AMRMClient.ContainerRequest(capability, hosts, racks, priority);
+ @Nullable String[] hosts, @Nullable String[] racks,
+ boolean relaxLocality) {
+ return new AMRMClient.ContainerRequest(capability, hosts, racks, priority, relaxLocality);
}
@Override
@@ -111,6 +117,14 @@ public final class Hadoop21YarnAMClient extends AbstractYarnAMClient<AMRMClient.
}
@Override
+ protected void updateBlacklist(List<String> blacklistAdditions, List<String> blacklistRemovals) {
+ // An empty implementation since Blacklist is not supported in Hadoop-2.1 AMRMClient.
+ if (recordUnsupportedFeature("blacklist")) {
+ LOG.warn("Blacklist is not supported in Hadoop 2.1 AMRMClient");
+ }
+ }
+
+ @Override
protected AllocateResult doAllocate(float progress) throws Exception {
AllocateResponse allocateResponse = amrmClient.allocate(progress);
List<RunnableProcessLauncher> launchers
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java
index 20558bb..9b70a86 100644
--- a/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java
+++ b/twill-yarn/src/main/hadoop21/org/apache/twill/internal/yarn/Hadoop21YarnAppClient.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.YarnClientApplication;
@@ -41,6 +42,8 @@ import org.apache.twill.internal.appmaster.ApplicationSubmitter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.List;
+
/**
*
*/
@@ -136,6 +139,11 @@ public final class Hadoop21YarnAppClient extends AbstractIdleService implements
}
@Override
+ public List<NodeReport> getNodeReports() throws Exception {
+ return this.yarnClient.getNodeReports();
+ }
+
+ @Override
protected void startUp() throws Exception {
yarnClient.start();
}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-yarn/src/main/hadoop22/org/apache/twill/internal/yarn/Hadoop22YarnAMClient.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/hadoop22/org/apache/twill/internal/yarn/Hadoop22YarnAMClient.java b/twill-yarn/src/main/hadoop22/org/apache/twill/internal/yarn/Hadoop22YarnAMClient.java
new file mode 100644
index 0000000..3ee99e8
--- /dev/null
+++ b/twill-yarn/src/main/hadoop22/org/apache/twill/internal/yarn/Hadoop22YarnAMClient.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.yarn;
+
+import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * Wrapper class for AMRMClient for Hadoop version 2.2 or greater.
+ */
+public final class Hadoop22YarnAMClient extends Hadoop21YarnAMClient {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Hadoop22YarnAMClient.class);
+
+ public Hadoop22YarnAMClient(Configuration conf) {
+ super(conf);
+ }
+
+ @Override
+ protected void updateBlacklist(List<String> blacklistAdditions, List<String> blacklistRemovals) {
+ LOG.debug("Blacklist Additions: {} , Blacklist Removals: {}", blacklistAdditions, blacklistRemovals);
+ amrmClient.updateBlacklist(blacklistAdditions, blacklistRemovals);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/AllocationSpecification.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/AllocationSpecification.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/AllocationSpecification.java
new file mode 100644
index 0000000..38cb32c
--- /dev/null
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/AllocationSpecification.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.internal.appmaster;
+
+import com.google.common.base.Objects;
+import org.apache.hadoop.yarn.api.records.Resource;
+
+/**
+ * This class defines how the containers should be allocated.
+ */
+public class AllocationSpecification {
+
+ /**
+ * Defines different types of allocation strategies.
+ */
+ enum Type {
+ ALLOCATE_ONE_INSTANCE_AT_A_TIME,
+ DEFAULT
+ }
+
+ /**
+ * Resource specification of runnables.
+ */
+ private Resource resource;
+
+ /**
+ * Allocation strategy Type.
+ */
+ private Type type;
+
+ /**
+ * Name of runnable. Set to null if the class represents more than one runnable.
+ */
+ private String runnableName;
+
+ /**
+ * Instance number for the runnable. Set to 0 if the class represents more than one instance / runnable.
+ */
+ private int instanceId;
+
+ public AllocationSpecification(Resource resource) {
+ this(resource, Type.DEFAULT, null, 0);
+ }
+
+ public AllocationSpecification(Resource resource, Type type, String runnableName, int instanceId) {
+ this.resource = resource;
+ this.type = type;
+ this.runnableName = runnableName;
+ this.instanceId = instanceId;
+ }
+
+ public Resource getResource() {
+ return this.resource;
+ }
+
+ public Type getType() {
+ return this.type;
+ }
+
+ public String getRunnableName() {
+ return this.runnableName;
+ }
+
+ public int getInstanceId() {
+ return this.instanceId;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (!(obj instanceof AllocationSpecification)) {
+ return false;
+ }
+ AllocationSpecification other = (AllocationSpecification) obj;
+ return (instanceId == other.instanceId) &&
+ Objects.equal(resource, other.resource) &&
+ Objects.equal(type, other.type) &&
+ Objects.equal(runnableName, other.runnableName);
+ }
+
+ @Override
+ public int hashCode() {
+ return this.resource.hashCode();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
index c25a002..b4f9997 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
@@ -25,7 +25,6 @@ import com.google.common.base.Throwables;
import com.google.common.collect.HashMultiset;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -64,6 +63,7 @@ import org.apache.twill.filesystem.Location;
import org.apache.twill.internal.AbstractTwillService;
import org.apache.twill.internal.Configs;
import org.apache.twill.internal.Constants;
+import org.apache.twill.internal.ContainerInfo;
import org.apache.twill.internal.DefaultTwillRunResources;
import org.apache.twill.internal.EnvKeys;
import org.apache.twill.internal.JvmOptions;
@@ -129,6 +129,7 @@ public final class ApplicationMasterService extends AbstractTwillService {
private final int reservedMemory;
private final EventHandler eventHandler;
private final Location applicationLocation;
+ private final PlacementPolicyManager placementPolicyManager;
private EmbeddedKafkaServer kafkaServer;
private Queue<RunnableContainerRequest> runnableContainerRequests;
@@ -146,6 +147,7 @@ public final class ApplicationMasterService extends AbstractTwillService {
this.credentials = createCredentials();
this.jvmOpts = loadJvmOptions();
this.reservedMemory = getReservedMemory();
+ this.placementPolicyManager = new PlacementPolicyManager(twillSpec.getPlacementPolicies());
amLiveNode = new ApplicationMasterLiveNodeData(Integer.parseInt(System.getenv(EnvKeys.YARN_APP_ID)),
Long.parseLong(System.getenv(EnvKeys.YARN_APP_ID_CLUSTER_TIME)),
@@ -348,7 +350,7 @@ public final class ApplicationMasterService extends AbstractTwillService {
private void doRun() throws Exception {
// The main loop
- Map.Entry<Resource, ? extends Collection<RuntimeSpecification>> currentRequest = null;
+ Map.Entry<AllocationSpecification, ? extends Collection<RuntimeSpecification>> currentRequest = null;
final Queue<ProvisionRequest> provisioning = Lists.newLinkedList();
YarnAMClient.AllocateHandler allocateHandler = new YarnAMClient.AllocateHandler() {
@@ -363,6 +365,8 @@ public final class ApplicationMasterService extends AbstractTwillService {
}
};
+ long requestStartTime = 0;
+ boolean isRequestRelaxed = false;
long nextTimeoutCheck = System.currentTimeMillis() + Constants.PROVISION_TIMEOUT;
while (isRunning()) {
// Call allocate. It has to be made at first in order to be able to get cluster resource availability.
@@ -383,10 +387,25 @@ public final class ApplicationMasterService extends AbstractTwillService {
runnableContainerRequests.poll();
}
}
+
// Nothing in provision, makes the next batch of provision request
if (provisioning.isEmpty() && currentRequest != null) {
- addContainerRequests(currentRequest.getKey(), currentRequest.getValue(), provisioning);
+ manageBlacklist(currentRequest);
+ addContainerRequests(currentRequest.getKey().getResource(), currentRequest.getValue(), provisioning,
+ currentRequest.getKey().getType());
currentRequest = null;
+ requestStartTime = System.currentTimeMillis();
+ isRequestRelaxed = false;
+ }
+
+ // Check for provision request timeout i.e. check if any provision request has been pending
+ // for more than the designated time. On timeout, relax the request constraints.
+ if (!provisioning.isEmpty() && !isRequestRelaxed &&
+ (System.currentTimeMillis() - requestStartTime) > Constants.CONSTRAINED_PROVISION_REQUEST_TIMEOUT) {
+ LOG.info("Relaxing provisioning constraints for request {}", provisioning.peek().getRequestId());
+ // Clear the blacklist for the pending provision request(s).
+ clearBlacklist();
+ isRequestRelaxed = true;
}
nextTimeoutCheck = checkProvisionTimeout(nextTimeoutCheck);
@@ -398,6 +417,37 @@ public final class ApplicationMasterService extends AbstractTwillService {
}
/**
+ * Manage Blacklist for a given request.
+ */
+ private void manageBlacklist(Map.Entry<AllocationSpecification, ? extends Collection<RuntimeSpecification>> request) {
+ clearBlacklist();
+
+ //Check the allocation strategy
+ AllocationSpecification currentAllocationSpecification = request.getKey();
+ if (currentAllocationSpecification.getType().equals(AllocationSpecification.Type.ALLOCATE_ONE_INSTANCE_AT_A_TIME)) {
+
+ //Check the placement policy
+ TwillSpecification.PlacementPolicy placementPolicy =
+ placementPolicyManager.getPlacementPolicy(currentAllocationSpecification.getRunnableName());
+ if (placementPolicy.getType().equals(TwillSpecification.PlacementPolicy.Type.DISTRIBUTED)) {
+
+ //Update blacklist with hosts which are running DISTRIBUTED runnables
+ for (String runnable : placementPolicyManager.getFellowRunnables(request.getKey().getRunnableName())) {
+ Collection<ContainerInfo> containerStats =
+ runningContainers.getContainerInfo(runnable);
+ for (ContainerInfo containerInfo : containerStats) {
+ // Yarn Resource Manager may include port in the node name depending on the setting
+ // YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME. It is safe to add both
+ // the names (with and without port) to the blacklist.
+ addToBlacklist(containerInfo.getHost().getHostName());
+ addToBlacklist(containerInfo.getHost().getHostName() + ":" + containerInfo.getPort());
+ }
+ }
+ }
+ }
+ }
+
+ /**
* Handling containers that are completed.
*/
private void handleCompleted(List<YarnContainerStatus> completedContainersStatuses) {
@@ -433,13 +483,16 @@ public final class ApplicationMasterService extends AbstractTwillService {
// Invoke event handler for provision request timeout
Map<String, ExpectedContainers.ExpectedCount> expiredRequests = expectedContainers.getAll();
Map<String, Integer> runningCounts = runningContainers.countAll();
+ Map<String, Integer> completedContainerCount = runningContainers.getCompletedContainerCount();
List<EventHandler.TimeoutEvent> timeoutEvents = Lists.newArrayList();
for (Map.Entry<String, ExpectedContainers.ExpectedCount> entry : expiredRequests.entrySet()) {
String runnableName = entry.getKey();
ExpectedContainers.ExpectedCount expectedCount = entry.getValue();
int runningCount = runningCounts.containsKey(runnableName) ? runningCounts.get(runnableName) : 0;
- if (expectedCount.getCount() != runningCount) {
+ int completedCount = completedContainerCount.containsKey(runnableName) ?
+ completedContainerCount.get(runnableName) : 0;
+ if (expectedCount.getCount() > runningCount + completedCount) {
timeoutEvents.add(new EventHandler.TimeoutEvent(runnableName, expectedCount.getCount(),
runningCount, expectedCount.getTimestamp()));
}
@@ -488,42 +541,101 @@ public final class ApplicationMasterService extends AbstractTwillService {
private Queue<RunnableContainerRequest> initContainerRequests() {
// Orderly stores container requests.
Queue<RunnableContainerRequest> requests = Lists.newLinkedList();
- // For each order in the twillSpec, create container request for each runnable.
+ // For each order in the twillSpec, create container request for runnables, depending on Placement policy.
for (TwillSpecification.Order order : twillSpec.getOrders()) {
- // Group container requests based on resource requirement.
- ImmutableMultimap.Builder<Resource, RuntimeSpecification> builder = ImmutableMultimap.builder();
- for (String runnableName : order.getNames()) {
+ Set<String> distributedRunnables = placementPolicyManager.getDistributedRunnables(order.getNames());
+ Set<String> defaultRunnables = Sets.newHashSet();
+ defaultRunnables.addAll(order.getNames());
+ defaultRunnables.removeAll(distributedRunnables);
+ Map<AllocationSpecification, Collection<RuntimeSpecification>> requestsMap = Maps.newHashMap();
+ for (String runnableName : distributedRunnables) {
RuntimeSpecification runtimeSpec = twillSpec.getRunnables().get(runnableName);
Resource capability = createCapability(runtimeSpec.getResourceSpecification());
- builder.put(capability, runtimeSpec);
+ for (int instanceId = 0; instanceId < runtimeSpec.getResourceSpecification().getInstances(); instanceId++) {
+ AllocationSpecification allocationSpecification =
+ new AllocationSpecification(capability, AllocationSpecification.Type.ALLOCATE_ONE_INSTANCE_AT_A_TIME,
+ runnableName, instanceId);
+ addAllocationSpecification(allocationSpecification, requestsMap, runtimeSpec);
+ }
}
- requests.add(new RunnableContainerRequest(order.getType(), builder.build()));
+ for (String runnableName : defaultRunnables) {
+ RuntimeSpecification runtimeSpec = twillSpec.getRunnables().get(runnableName);
+ Resource capability = createCapability(runtimeSpec.getResourceSpecification());
+ AllocationSpecification allocationSpecification = new AllocationSpecification(capability);
+ addAllocationSpecification(allocationSpecification, requestsMap, runtimeSpec);
+ }
+ requests.add(new RunnableContainerRequest(order.getType(), requestsMap));
}
return requests;
}
/**
+ * Helper method to create {@link org.apache.twill.internal.appmaster.RunnableContainerRequest}.
+ */
+ private void addAllocationSpecification(AllocationSpecification allocationSpecification,
+ Map<AllocationSpecification, Collection<RuntimeSpecification>> map,
+ RuntimeSpecification runtimeSpec) {
+ if (!map.containsKey(allocationSpecification)) {
+ map.put(allocationSpecification, Lists.<RuntimeSpecification>newLinkedList());
+ }
+ map.get(allocationSpecification).add(runtimeSpec);
+ }
+
+ /**
* Adds container requests with the given resource capability for each runtime.
*/
private void addContainerRequests(Resource capability,
Collection<RuntimeSpecification> runtimeSpecs,
- Queue<ProvisionRequest> provisioning) {
+ Queue<ProvisionRequest> provisioning,
+ AllocationSpecification.Type allocationType) {
for (RuntimeSpecification runtimeSpec : runtimeSpecs) {
String name = runtimeSpec.getName();
int newContainers = expectedContainers.getExpected(name) - runningContainers.count(name);
if (newContainers > 0) {
+ if (allocationType.equals(AllocationSpecification.Type.ALLOCATE_ONE_INSTANCE_AT_A_TIME)) {
+ //Spawning 1 instance at a time
+ newContainers = 1;
+ }
+ TwillSpecification.PlacementPolicy placementPolicy = placementPolicyManager.getPlacementPolicy(name);
+ Set<String> hosts = Sets.newHashSet();
+ Set<String> racks = Sets.newHashSet();
+ if (placementPolicy != null) {
+ hosts = placementPolicy.getHosts();
+ racks = placementPolicy.getRacks();
+ }
// TODO: Allow user to set priority?
- LOG.info("Request {} container with capability {}", newContainers, capability);
+ LOG.info("Request {} container with capability {} for runnable {}", newContainers, capability, name);
String requestId = amClient.addContainerRequest(capability, newContainers)
- .addHosts(runtimeSpec.getResourceSpecification().getHosts())
- .addRacks(runtimeSpec.getResourceSpecification().getRacks())
+ .addHosts(hosts)
+ .addRacks(racks)
.setPriority(0).apply();
- provisioning.add(new ProvisionRequest(runtimeSpec, requestId, newContainers));
+ provisioning.add(new ProvisionRequest(runtimeSpec, requestId, newContainers, allocationType));
}
}
}
/**
+ * Add a resource to the blacklist.
+ */
+ private void addToBlacklist(String resource) {
+ amClient.addToBlacklist(resource);
+ }
+
+ /**
+ * Remove a resource from the blacklist.
+ */
+ private void removeFromBlacklist(String resource) {
+ amClient.removeFromBlacklist(resource);
+ }
+
+ /**
+ * Clears the resource blacklist.
+ */
+ private void clearBlacklist() {
+ amClient.clearBlacklist();
+ }
+
+ /**
* Launches runnables in the provisioned containers.
*/
private void launchRunnable(List<? extends ProcessLauncher<YarnContainerInfo>> launchers,
@@ -564,9 +676,12 @@ public final class ApplicationMasterService extends AbstractTwillService {
amClient.completeContainerRequest(provisionRequest.getRequestId());
}
+ if (expectedContainers.getExpected(runnableName) == runningContainers.count(runnableName) ||
+ provisioning.peek().getType().equals(AllocationSpecification.Type.ALLOCATE_ONE_INSTANCE_AT_A_TIME)) {
+ provisioning.poll();
+ }
if (expectedContainers.getExpected(runnableName) == runningContainers.count(runnableName)) {
LOG.info("Runnable " + runnableName + " fully provisioned with " + containerCount + " instances.");
- provisioning.poll();
}
}
}
@@ -708,7 +823,7 @@ public final class ApplicationMasterService extends AbstractTwillService {
}
} else {
// Increase the number of instances
- runnableContainerRequests.add(createRunnableContainerRequest(runnableName));
+ runnableContainerRequests.add(createRunnableContainerRequest(runnableName, newCount - oldCount));
}
} finally {
runningContainers.sendToRunnable(runnableName, message, completion);
@@ -723,6 +838,11 @@ public final class ApplicationMasterService extends AbstractTwillService {
}
private RunnableContainerRequest createRunnableContainerRequest(final String runnableName) {
+ return createRunnableContainerRequest(runnableName, 1);
+ }
+
+ private RunnableContainerRequest createRunnableContainerRequest(final String runnableName,
+ final int numberOfInstances) {
// Find the current order of the given runnable in order to create a RunnableContainerRequest.
TwillSpecification.Order order = Iterables.find(twillSpec.getOrders(), new Predicate<TwillSpecification.Order>() {
@Override
@@ -733,7 +853,20 @@ public final class ApplicationMasterService extends AbstractTwillService {
RuntimeSpecification runtimeSpec = twillSpec.getRunnables().get(runnableName);
Resource capability = createCapability(runtimeSpec.getResourceSpecification());
- return new RunnableContainerRequest(order.getType(), ImmutableMultimap.of(capability, runtimeSpec));
+ Map<AllocationSpecification, Collection<RuntimeSpecification>> requestsMap = Maps.newHashMap();
+ if (placementPolicyManager.getPlacementPolicyType(runnableName).equals(
+ TwillSpecification.PlacementPolicy.Type.DISTRIBUTED)) {
+ for (int instanceId = 0; instanceId < numberOfInstances; instanceId++) {
+ AllocationSpecification allocationSpecification =
+ new AllocationSpecification(capability, AllocationSpecification.Type.ALLOCATE_ONE_INSTANCE_AT_A_TIME,
+ runnableName, instanceId);
+ addAllocationSpecification(allocationSpecification, requestsMap, runtimeSpec);
+ }
+ } else {
+ AllocationSpecification allocationSpecification = new AllocationSpecification(capability);
+ addAllocationSpecification(allocationSpecification, requestsMap, runtimeSpec);
+ }
+ return new RunnableContainerRequest(order.getType(), requestsMap);
}
private Runnable getMessageCompletion(final String messageId, final SettableFuture<String> future) {
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/PlacementPolicyManager.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/PlacementPolicyManager.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/PlacementPolicyManager.java
new file mode 100644
index 0000000..91a58bb
--- /dev/null
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/PlacementPolicyManager.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.twill.internal.appmaster;
+
+import com.google.common.collect.Sets;
+import org.apache.twill.api.TwillSpecification;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * This class provides helper functions for operating on a set of Placement Policies.
+ */
+public class PlacementPolicyManager {
+ List<TwillSpecification.PlacementPolicy> placementPolicies;
+
+ public PlacementPolicyManager(List<TwillSpecification.PlacementPolicy> placementPolicies) {
+ this.placementPolicies = placementPolicies;
+ }
+
+ /**
+ * Given a set of runnables, get all runnables which belong to DISTRIBUTED placement policies.
+ * @param givenRunnables Set of runnables.
+ * @return Subset of runnables, which belong to DISTRIBUTED placement policies.
+ */
+ public Set<String> getDistributedRunnables(Set<String> givenRunnables) {
+ Set<String> distributedRunnables = getAllDistributedRunnables();
+ distributedRunnables.retainAll(givenRunnables);
+ return distributedRunnables;
+ }
+
+ /**
+ * Given a runnable, get the type of placement policy. Returns DEFAULT if no placement policy is specified.
+ * @param runnableName Name of runnable.
+ * @return Placement policy type of the runnable.
+ */
+ public TwillSpecification.PlacementPolicy.Type getPlacementPolicyType(String runnableName) {
+ for (TwillSpecification.PlacementPolicy placementPolicy : placementPolicies) {
+ if (placementPolicy.getNames().contains(runnableName)) {
+ return placementPolicy.getType();
+ }
+ }
+ return TwillSpecification.PlacementPolicy.Type.DEFAULT;
+ }
+
+ /**
+ * Get all runnables which belong to the same Placement policy as the given runnable.
+ * @param runnableName Name of runnable.
+ * @return Set of runnables, with same placement policy.
+ */
+ public Set<String> getFellowRunnables(String runnableName) {
+ for (TwillSpecification.PlacementPolicy placementPolicy : placementPolicies) {
+ if (placementPolicy.getNames().contains(runnableName)) {
+ return placementPolicy.getNames();
+ }
+ }
+ return Collections.emptySet();
+ }
+
+ /**
+ * Get the placement policy of the runnable.
+ * Returns null if runnable does not belong to a placement policy.
+ * @param runnableName Name of runnable.
+ * @return Placement policy of the runnable.
+ */
+ public TwillSpecification.PlacementPolicy getPlacementPolicy(String runnableName) {
+ for (TwillSpecification.PlacementPolicy placementPolicy : placementPolicies) {
+ if (placementPolicy.getNames().contains(runnableName)) {
+ return placementPolicy;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Gets all runnables which belong to DISTRIBUTED placement policies.
+ */
+ private Set<String> getAllDistributedRunnables() {
+ Set<String> distributedRunnables = Sets.newHashSet();
+ for (TwillSpecification.PlacementPolicy placementPolicy : placementPolicies) {
+ if (placementPolicy.getType().equals(TwillSpecification.PlacementPolicy.Type.DISTRIBUTED)) {
+ distributedRunnables.addAll(placementPolicy.getNames());
+ }
+ }
+ return distributedRunnables;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ProvisionRequest.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ProvisionRequest.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ProvisionRequest.java
index 002d2a5..a6eba3e 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ProvisionRequest.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ProvisionRequest.java
@@ -26,11 +26,18 @@ final class ProvisionRequest {
private final RuntimeSpecification runtimeSpec;
private final String requestId;
private int requestCount;
+ private final AllocationSpecification.Type type;
ProvisionRequest(RuntimeSpecification runtimeSpec, String requestId, int requestCount) {
+ this(runtimeSpec, requestId, requestCount, AllocationSpecification.Type.DEFAULT);
+ }
+
+ ProvisionRequest(RuntimeSpecification runtimeSpec, String requestId, int requestCount,
+ AllocationSpecification.Type type) {
this.runtimeSpec = runtimeSpec;
this.requestId = requestId;
this.requestCount = requestCount;
+ this.type = type;
}
RuntimeSpecification getRuntimeSpec() {
@@ -49,4 +56,8 @@ final class ProvisionRequest {
requestCount--;
return requestCount == 0;
}
+
+ AllocationSpecification.Type getType() {
+ return this.type;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableContainerRequest.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableContainerRequest.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableContainerRequest.java
index de199ad..2105629 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableContainerRequest.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableContainerRequest.java
@@ -20,7 +20,6 @@ package org.apache.twill.internal.appmaster;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
import com.google.common.collect.Maps;
-import com.google.common.collect.Multimap;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.twill.api.RuntimeSpecification;
import org.apache.twill.api.TwillSpecification;
@@ -34,12 +33,12 @@ import java.util.Map;
*/
final class RunnableContainerRequest {
private final TwillSpecification.Order.Type orderType;
- private final Iterator<Map.Entry<Resource, Collection<RuntimeSpecification>>> requests;
+ private final Iterator<Map.Entry<AllocationSpecification, Collection<RuntimeSpecification>>> requests;
RunnableContainerRequest(TwillSpecification.Order.Type orderType,
- Multimap<Resource, RuntimeSpecification> requests) {
+ Map<AllocationSpecification, Collection<RuntimeSpecification>> requests) {
this.orderType = orderType;
- this.requests = requests.asMap().entrySet().iterator();
+ this.requests = requests.entrySet().iterator();
}
TwillSpecification.Order.Type getOrderType() {
@@ -51,8 +50,8 @@ final class RunnableContainerRequest {
* @return The {@link Resource} and {@link Collection} of {@link RuntimeSpecification} or
* {@code null} if there is no more request.
*/
- Map.Entry<Resource, ? extends Collection<RuntimeSpecification>> takeRequest() {
- Map.Entry<Resource, Collection<RuntimeSpecification>> next = Iterators.getNext(requests, null);
+ Map.Entry<AllocationSpecification, ? extends Collection<RuntimeSpecification>> takeRequest() {
+ Map.Entry<AllocationSpecification, Collection<RuntimeSpecification>> next = Iterators.getNext(requests, null);
return next == null ? null : Maps.immutableEntry(next.getKey(), ImmutableList.copyOf(next.getValue()));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableProcessLauncher.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableProcessLauncher.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableProcessLauncher.java
index 2126541..0e3aea5 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableProcessLauncher.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunnableProcessLauncher.java
@@ -68,8 +68,9 @@ public final class RunnableProcessLauncher extends AbstractYarnProcessLauncher<Y
launchContext.setEnvironment(env);
- LOG.info("Launching in container {} at {}, {}",
- containerInfo.getId(), containerInfo.getHost(), launchContext.getCommands());
+ LOG.info("Launching in container {} at {}:{}, {}",
+ containerInfo.getId(), containerInfo.getHost().getHostName(),
+ containerInfo.getPort(), launchContext.getCommands());
final Cancellable cancellable = nmClient.start(containerInfo, launchContext);
launched = true;
http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/8cf4cd02/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java
index 5267616..383e5e0 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/RunningContainers.java
@@ -20,10 +20,13 @@ package org.apache.twill.internal.appmaster;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.HashBasedTable;
+import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
import com.google.common.collect.Multiset;
import com.google.common.collect.Table;
import com.google.common.util.concurrent.FutureCallback;
@@ -53,6 +56,7 @@ import org.slf4j.LoggerFactory;
import java.util.BitSet;
import java.util.Collection;
+import java.util.Collections;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
@@ -85,20 +89,24 @@ final class RunningContainers {
// Map from runnableName to a BitSet, with the <instanceId> bit turned on for having an instance running.
private final Map<String, BitSet> runnableInstances;
+ private final Map<String, Integer> completedContainerCount;
private final DefaultResourceReport resourceReport;
private final Deque<String> startSequence;
private final Lock containerLock;
private final Condition containerChange;
private final ZKClient zkClient;
+ private final Multimap<String, ContainerInfo> containerStats;
RunningContainers(String appId, TwillRunResources appMasterResources, ZKClient zookeeperClient) {
containers = HashBasedTable.create();
runnableInstances = Maps.newHashMap();
+ completedContainerCount = Maps.newHashMap();
startSequence = Lists.newLinkedList();
containerLock = new ReentrantLock();
containerChange = containerLock.newCondition();
resourceReport = new DefaultResourceReport(appId, appMasterResources);
zkClient = zookeeperClient;
+ containerStats = HashMultimap.create();
}
/**
@@ -128,6 +136,7 @@ final class RunningContainers {
containerInfo.getHost().getHostName(),
controller);
resourceReport.addRunResources(runnableName, resources);
+ containerStats.put(runnableName, containerInfo);
if (startSequence.isEmpty() || !runnableName.equals(startSequence.peekLast())) {
startSequence.addLast(runnableName);
@@ -157,6 +166,15 @@ final class RunningContainers {
}
/**
+ * Given a runnable name, returns a list of {@link org.apache.twill.internal.ContainerInfo} for it's instances.
+ * @param runnableName name of a runnable.
+ * @return a list of {@link org.apache.twill.internal.ContainerInfo} for instances of a runnable.
+ */
+ Collection<ContainerInfo> getContainerInfo(String runnableName) {
+ return containerStats.get(runnableName);
+ }
+
+ /**
* Stops and removes the last running container of the given runnable.
*/
void removeLast(String runnableName) {
@@ -186,6 +204,7 @@ final class RunningContainers {
LOG.info("Stopping service: {} {}", runnableName, lastController.getRunId());
lastController.stopAndWait();
containers.remove(runnableName, lastContainerId);
+ removeContainerInfo(lastContainerId);
removeInstanceId(runnableName, maxInstanceId);
resourceReport.removeRunnableResources(runnableName, lastContainerId);
containerChange.signalAll();
@@ -232,6 +251,18 @@ final class RunningContainers {
}
}
+ /**
+ * Returns a Map containing number of successfully completed containers for all runnables.
+ */
+ Map<String, Integer> getCompletedContainerCount() {
+ containerLock.lock();
+ try {
+ return ImmutableMap.copyOf(completedContainerCount);
+ } finally {
+ containerLock.unlock();
+ }
+ }
+
void sendToAll(Message message, Runnable completion) {
containerLock.lock();
try {
@@ -293,6 +324,7 @@ final class RunningContainers {
}
containers.clear();
runnableInstances.clear();
+ containerStats.clear();
} finally {
containerLock.unlock();
}
@@ -320,6 +352,7 @@ final class RunningContainers {
ContainerState state = status.getState();
try {
+ removeContainerInfo(containerId);
Map<String, TwillContainerController> lookup = containers.column(containerId);
if (lookup.isEmpty()) {
// It's OK because if a container is stopped through removeLast, this would be empty.
@@ -346,6 +379,12 @@ final class RunningContainers {
TwillContainerController controller = completedEntry.getValue();
controller.completed(exitStatus);
+ if (exitStatus == ContainerExitCodes.SUCCESS) {
+ if (!completedContainerCount.containsKey(runnableName)) {
+ completedContainerCount.put(runnableName, 0);
+ }
+ completedContainerCount.put(runnableName, completedContainerCount.get(runnableName) + 1);
+ }
removeInstanceId(runnableName, getInstanceId(controller.getRunId()));
resourceReport.removeRunnableResources(runnableName, containerId);
}
@@ -456,6 +495,21 @@ final class RunningContainers {
}
/**
+ * Given the containerId, removes the corresponding containerInfo.
+ * @param containerId Id for the container to be removed.
+ * @return Returns {@code false} if container with the provided id was not found, {@code true} otherwise.
+ */
+ private boolean removeContainerInfo(String containerId) {
+ for (ContainerInfo containerInfo : this.containerStats.values()) {
+ if (containerInfo.getId().equals(containerId)) {
+ this.containerStats.values().remove(containerInfo);
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
* A helper class that overrides the debug port of the resources with the live info from the container controller.
*/
private static class DynamicTwillRunResources extends DefaultTwillRunResources {