You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@twill.apache.org by ch...@apache.org on 2014/04/22 08:06:30 UTC

[33/50] [abbrv] git commit: exposes the YARN client ContainerRequest hosts and racks options into ResourceSpecification #TWILL-40

exposes the YARN client ContainerRequest hosts and racks options into ResourceSpecification #TWILL-40

Signed-off-by: Terence Yim <te...@continuuity.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-twill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-twill/commit/ac67f1f6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-twill/tree/ac67f1f6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-twill/diff/ac67f1f6

Branch: refs/heads/site
Commit: ac67f1f620099e86b5503ff847862306e16c4db9
Parents: a77b67c
Author: Fabian Murariu <mu...@gmail.com>
Authored: Thu Feb 13 19:01:03 2014 +0200
Committer: Terence Yim <te...@continuuity.com>
Committed: Fri Feb 21 12:33:26 2014 -0800

----------------------------------------------------------------------
 pom.xml                                         |  6 +++
 .../apache/twill/api/ResourceSpecification.java | 53 +++++++++++++++-----
 .../internal/DefaultResourceSpecification.java  | 18 +++++++
 twill-core/pom.xml                              |  4 ++
 .../json/ResourceSpecificationCodec.java        | 21 ++++----
 .../json/ResourceSpecificationCodecTest.java    | 51 +++++++++++++++++++
 .../appmaster/ApplicationMasterService.java     |  5 +-
 .../twill/internal/yarn/YarnAMClient.java       | 21 ++++----
 .../apache/twill/yarn/EchoServerTestRun.java    |  2 +
 9 files changed, 147 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/ac67f1f6/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 2141000..0e692f1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -797,6 +797,12 @@
                 <scope>test</scope>
             </dependency>
             <dependency>
+                <groupId>org.unitils</groupId>
+                <artifactId>unitils-core</artifactId>
+                <version>3.3</version>
+                <scope>test</scope>
+            </dependency>
+            <dependency>
                 <groupId>org.apache.commons</groupId>
                 <artifactId>commons-compress</artifactId>
                 <version>${commons-compress.version}</version>

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/ac67f1f6/twill-api/src/main/java/org/apache/twill/api/ResourceSpecification.java
----------------------------------------------------------------------
diff --git a/twill-api/src/main/java/org/apache/twill/api/ResourceSpecification.java b/twill-api/src/main/java/org/apache/twill/api/ResourceSpecification.java
index 0bab811..75427a7 100644
--- a/twill-api/src/main/java/org/apache/twill/api/ResourceSpecification.java
+++ b/twill-api/src/main/java/org/apache/twill/api/ResourceSpecification.java
@@ -79,6 +79,20 @@ public interface ResourceSpecification {
   int getInstances();
 
   /**
+   * Returns the execution hosts, expects Fully Qualified Domain Names host + domain
+   * This is a suggestion for the scheduler depending on cluster load it may ignore it
+   * @return An array containing the hosts where the containers should run
+   */
+  String[] getHosts();
+
+  /**
+   * Returns the execution racks
+   * This is a suggestion for the scheduler depending on cluster load it may ignore it
+   * @return An array containing the racks where the containers should run
+   */
+  String[] getRacks();
+
+  /**
    * Builder for creating {@link ResourceSpecification}.
    */
   static final class Builder {
@@ -88,11 +102,31 @@ public interface ResourceSpecification {
     private int uplink = -1;
     private int downlink = -1;
     private int instances = 1;
+    private String[] hosts = new String[0];
+    private String[] racks = new String[0];
 
     public static CoreSetter with() {
       return new Builder().new CoreSetter();
     }
 
+    public final class HostsSetter extends Build {
+      public RackSetter setHosts(String... hosts) {
+        if (hosts != null) {
+          Builder.this.hosts = hosts.clone();
+        }
+        return new RackSetter();
+      }
+    }
+
+    public final class RackSetter extends Build {
+      public AfterRacks setRacks(String... racks) {
+        if (racks != null) {
+          Builder.this.racks = racks.clone();
+        }
+        return new AfterRacks();
+      }
+    }
+
     public final class CoreSetter {
       @Deprecated
       public MemorySetter setCores(int cores) {
@@ -114,13 +148,13 @@ public interface ResourceSpecification {
     }
 
     public final class AfterMemory extends Build {
-      public AfterInstances setInstances(int instances) {
+      public HostsSetter setInstances(int instances) {
         Builder.this.instances = instances;
-        return new AfterInstances();
+        return new HostsSetter();
       }
     }
 
-    public final class AfterInstances extends Build {
+    public final class AfterRacks extends Build {
       public AfterUplink setUplink(int uplink, SizeUnit unit) {
         Builder.this.uplink = uplink * unit.multiplier;
         return new AfterUplink();
@@ -128,23 +162,18 @@ public interface ResourceSpecification {
     }
 
     public final class AfterUplink extends Build {
-      public AfterDownlink setDownlink(int downlink, SizeUnit unit) {
+      public Done setDownlink(int downlink, SizeUnit unit) {
         Builder.this.downlink = downlink * unit.multiplier;
-        return new AfterDownlink();
+        return new Done();
       }
     }
 
-    public final class AfterDownlink extends Build {
-
-      @Override
-      public ResourceSpecification build() {
-        return super.build();
-      }
+    public final class Done extends Build {
     }
 
     public abstract class Build {
       public ResourceSpecification build() {
-        return new DefaultResourceSpecification(cores, memory, instances, uplink, downlink);
+        return new DefaultResourceSpecification(cores, memory, instances, uplink, downlink, hosts, racks);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/ac67f1f6/twill-api/src/main/java/org/apache/twill/internal/DefaultResourceSpecification.java
----------------------------------------------------------------------
diff --git a/twill-api/src/main/java/org/apache/twill/internal/DefaultResourceSpecification.java b/twill-api/src/main/java/org/apache/twill/internal/DefaultResourceSpecification.java
index 1327ce5..4451ac1 100644
--- a/twill-api/src/main/java/org/apache/twill/internal/DefaultResourceSpecification.java
+++ b/twill-api/src/main/java/org/apache/twill/internal/DefaultResourceSpecification.java
@@ -28,13 +28,21 @@ public final class DefaultResourceSpecification implements ResourceSpecification
   private final int instances;
   private final int uplink;
   private final int downlink;
+  private final String[] hosts;
+  private final String[] racks;
 
   public DefaultResourceSpecification(int virtualCores, int memorySize, int instances, int uplink, int downlink) {
+    this(virtualCores, memorySize, instances, uplink, downlink, new String[0], new String[0]);
+  }
+
+  public DefaultResourceSpecification(int virtualCores, int memorySize, int instances, int uplink, int downlink, String[] hosts, String[] racks) {
     this.virtualCores = virtualCores;
     this.memorySize = memorySize;
     this.instances = instances;
     this.uplink = uplink;
     this.downlink = downlink;
+    this.hosts = hosts;
+    this.racks = racks;
   }
 
   @Deprecated
@@ -59,6 +67,16 @@ public final class DefaultResourceSpecification implements ResourceSpecification
   }
 
   @Override
+  public String[] getHosts() {
+    return hosts.clone();
+  }
+
+  @Override
+  public String[] getRacks() {
+    return racks.clone();
+  }
+
+  @Override
   public int getUplink() {
     return uplink;
   }

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/ac67f1f6/twill-core/pom.xml
----------------------------------------------------------------------
diff --git a/twill-core/pom.xml b/twill-core/pom.xml
index 859c653..124547c 100644
--- a/twill-core/pom.xml
+++ b/twill-core/pom.xml
@@ -86,6 +86,10 @@
             <artifactId>junit</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.unitils</groupId>
+            <artifactId>unitils-core</artifactId>
+        </dependency>
+        <dependency>
             <groupId>org.apache.commons</groupId>
             <artifactId>commons-compress</artifactId>
         </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/ac67f1f6/twill-core/src/main/java/org/apache/twill/internal/json/ResourceSpecificationCodec.java
----------------------------------------------------------------------
diff --git a/twill-core/src/main/java/org/apache/twill/internal/json/ResourceSpecificationCodec.java b/twill-core/src/main/java/org/apache/twill/internal/json/ResourceSpecificationCodec.java
index d3b9707..e88045c 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/json/ResourceSpecificationCodec.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/json/ResourceSpecificationCodec.java
@@ -17,13 +17,7 @@
  */
 package org.apache.twill.internal.json;
 
-import com.google.gson.JsonDeserializationContext;
-import com.google.gson.JsonDeserializer;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParseException;
-import com.google.gson.JsonSerializationContext;
-import com.google.gson.JsonSerializer;
+import com.google.gson.*;
 import org.apache.twill.api.ResourceSpecification;
 import org.apache.twill.internal.DefaultResourceSpecification;
 
@@ -38,13 +32,13 @@ final class ResourceSpecificationCodec implements JsonSerializer<ResourceSpecifi
   @Override
   public JsonElement serialize(ResourceSpecification src, Type typeOfSrc, JsonSerializationContext context) {
     JsonObject json = new JsonObject();
-
     json.addProperty("cores", src.getVirtualCores());
     json.addProperty("memorySize", src.getMemorySize());
     json.addProperty("instances", src.getInstances());
     json.addProperty("uplink", src.getUplink());
     json.addProperty("downlink", src.getDownlink());
-
+    json.add("hosts", context.serialize(src.getHosts()));
+    json.add("racks", context.serialize(src.getRacks()));
     return json;
   }
 
@@ -52,10 +46,17 @@ final class ResourceSpecificationCodec implements JsonSerializer<ResourceSpecifi
   public ResourceSpecification deserialize(JsonElement json, Type typeOfT,
                                            JsonDeserializationContext context) throws JsonParseException {
     JsonObject jsonObj = json.getAsJsonObject();
+    final String[] hosts = context.deserialize(jsonObj.getAsJsonArray("hosts"), String[].class);
+    final String[] racks = context.deserialize(jsonObj.getAsJsonArray("racks"), String[].class);
     return new DefaultResourceSpecification(jsonObj.get("cores").getAsInt(),
                                             jsonObj.get("memorySize").getAsInt(),
                                             jsonObj.get("instances").getAsInt(),
                                             jsonObj.get("uplink").getAsInt(),
-                                            jsonObj.get("downlink").getAsInt());
+                                            jsonObj.get("downlink").getAsInt(),
+                                            hosts,
+                                            racks);
+
+
   }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/ac67f1f6/twill-core/src/test/java/org/apache/twill/internal/json/ResourceSpecificationCodecTest.java
----------------------------------------------------------------------
diff --git a/twill-core/src/test/java/org/apache/twill/internal/json/ResourceSpecificationCodecTest.java b/twill-core/src/test/java/org/apache/twill/internal/json/ResourceSpecificationCodecTest.java
new file mode 100644
index 0000000..7bd4f39
--- /dev/null
+++ b/twill-core/src/test/java/org/apache/twill/internal/json/ResourceSpecificationCodecTest.java
@@ -0,0 +1,51 @@
+package org.apache.twill.internal.json;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonElement;
+import org.apache.twill.api.ResourceSpecification;
+import org.apache.twill.internal.DefaultResourceSpecification;
+import org.junit.Test;
+
+import static org.apache.twill.api.ResourceSpecification.SizeUnit.GIGA;
+import static org.junit.Assert.assertEquals;
+import static org.unitils.reflectionassert.ReflectionAssert.assertLenientEquals;
+
+public class ResourceSpecificationCodecTest {
+  private final Gson gson = new GsonBuilder().serializeNulls()
+          .registerTypeAdapter(ResourceSpecification.class, new ResourceSpecificationCodec())
+          .registerTypeAdapter(DefaultResourceSpecification.class, new ResourceSpecificationCodec())
+          .create();
+
+  @Test
+  public void testCodec() throws Exception {
+    String expectedString = "{\"cores\":2,\"memorySize\":1024,\"instances\":2,\"uplink\":100,\"downlink\":100,\"hosts\":[\"one1\",\"two2\"],\"racks\":[\"three3\"]}";
+    final ResourceSpecification expected =
+            new DefaultResourceSpecification(2, 1024, 2, 100, 100, new String[]{"one1", "two2"}, new String[]{"three3"});
+    final String actualString = gson.toJson(expected);
+    assertEquals(expectedString, actualString);
+
+    final JsonElement expectedJson = gson.toJsonTree(expected);
+    final ResourceSpecification actual = gson.fromJson(expectedJson, DefaultResourceSpecification.class);
+    final JsonElement actualJson = gson.toJsonTree(actual);
+
+    assertEquals(expectedJson, actualJson);
+    assertLenientEquals(expected, actual);
+  }
+
+  @Test
+  public void testBuilder() throws Exception {
+    final ResourceSpecification actual = ResourceSpecification.Builder.with()
+            .setVirtualCores(5)
+            .setMemory(4, GIGA)
+            .setInstances(3)
+            .setHosts("a1", "b2", "c3")
+            .setRacks("r2")
+            .setUplink(10, GIGA)
+            .setDownlink(5, GIGA).build();
+    final DefaultResourceSpecification expectd =
+            new DefaultResourceSpecification(5, 4096, 3, 10240, 5120, new String[]{"a1", "b2", "c3"}, new String[]{"r2"});
+    assertLenientEquals(expectd, actual);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/ac67f1f6/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
index 7caedad..3a6ce20 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
@@ -507,7 +507,10 @@ public final class ApplicationMasterService extends AbstractTwillService {
       if (newContainers > 0) {
         // TODO: Allow user to set priority?
         LOG.info("Request {} container with capability {}", newContainers, capability);
-        String requestId = amClient.addContainerRequest(capability, newContainers).setPriority(0).apply();
+        String requestId = amClient.addContainerRequest(capability, newContainers)
+                .addHosts(runtimeSpec.getResourceSpecification().getHosts())
+                .addRacks(runtimeSpec.getResourceSpecification().getRacks())
+                .setPriority(0).apply();
         provisioning.add(new ProvisionRequest(runtimeSpec, requestId, newContainers));
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/ac67f1f6/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAMClient.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAMClient.java b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAMClient.java
index b0dbce0..c8da649 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAMClient.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/yarn/YarnAMClient.java
@@ -27,10 +27,9 @@ import org.apache.twill.internal.ProcessLauncher;
 
 import java.net.InetSocketAddress;
 import java.net.URL;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Set;
+import java.util.*;
+
+import static org.apache.commons.lang.ArrayUtils.isEmpty;
 
 /**
  * This interface provides abstraction for AM to interacts with YARN to abstract out YARN version specific
@@ -54,12 +53,12 @@ public interface YarnAMClient extends Service {
       this.count = count;
     }
 
-    public ContainerRequestBuilder addHosts(String firstHost, String...moreHosts) {
-      return add(hosts, firstHost, moreHosts);
+    public ContainerRequestBuilder addHosts(String...newHosts) {
+      return add(hosts, newHosts);
     }
 
-    public ContainerRequestBuilder addRacks(String firstRack, String...moreRacks) {
-      return add(racks, firstRack, moreRacks);
+    public ContainerRequestBuilder addRacks(String...newRacks) {
+      return add(racks, newRacks);
     }
 
     public ContainerRequestBuilder setPriority(int prio) {
@@ -72,9 +71,9 @@ public interface YarnAMClient extends Service {
      */
     public abstract String apply();
 
-    private <T> ContainerRequestBuilder add(Collection<T> collection, T first, T... more) {
-      collection.add(first);
-      Collections.addAll(collection, more);
+    private <T> ContainerRequestBuilder add(Collection<T> collection, T... more) {
+      if (!isEmpty(more))
+        Collections.addAll(collection, more);
       return this;
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-twill/blob/ac67f1f6/twill-yarn/src/test/java/org/apache/twill/yarn/EchoServerTestRun.java
----------------------------------------------------------------------
diff --git a/twill-yarn/src/test/java/org/apache/twill/yarn/EchoServerTestRun.java b/twill-yarn/src/test/java/org/apache/twill/yarn/EchoServerTestRun.java
index 23fc82b..1c3034c 100644
--- a/twill-yarn/src/test/java/org/apache/twill/yarn/EchoServerTestRun.java
+++ b/twill-yarn/src/test/java/org/apache/twill/yarn/EchoServerTestRun.java
@@ -61,6 +61,8 @@ public final class EchoServerTestRun extends BaseYarnTest {
                                                          .setVirtualCores(1)
                                                          .setMemory(1, ResourceSpecification.SizeUnit.GIGA)
                                                          .setInstances(2)
+                                                         .setHosts("someHost1.domain.no","someHost2.domain.no") /*demo only ignored in this test*/
+                                                         .setRacks("someRack1") /* demo only ignored in this test*/
                                                          .build())
                                         .addLogHandler(new PrinterLogHandler(new PrintWriter(System.out, true)))
                                         .withApplicationArguments("echo")