You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by nw...@apache.org on 2019/01/30 06:58:35 UTC

[incubator-heron] branch master updated: [Streamlet] Fix streamlet config to apply cpu and ram in topology (#3180)

This is an automated email from the ASF dual-hosted git repository.

nwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git


The following commit(s) were added to refs/heads/master by this push:
     new 8a6949b  [Streamlet] Fix streamlet config to apply cpu and ram in topology (#3180)
8a6949b is described below

commit 8a6949bcd02890eccc756f576f115f84cfaf25a8
Author: Ning Wang <nw...@twitter.com>
AuthorDate: Tue Jan 29 22:58:30 2019 -0800

    [Streamlet] Fix streamlet config to apply cpu and ram in topology (#3180)
    
    * [Streamlet] Fix streamlet config to apply cpu and ram in topology
    
    * Set undefined cpu and ram to -1
    
    * fix unit tests
---
 .../java/org/apache/heron/streamlet/Config.java    | 49 ++++++++++++----------
 .../heron/streamlet/impl/StreamletImplTest.java    | 26 ++++++++++--
 2 files changed, 49 insertions(+), 26 deletions(-)

diff --git a/heron/api/src/java/org/apache/heron/streamlet/Config.java b/heron/api/src/java/org/apache/heron/streamlet/Config.java
index f4b29c8..c9c5338 100644
--- a/heron/api/src/java/org/apache/heron/streamlet/Config.java
+++ b/heron/api/src/java/org/apache/heron/streamlet/Config.java
@@ -22,6 +22,7 @@ package org.apache.heron.streamlet;
 
 import java.io.Serializable;
 
+import org.apache.heron.common.basics.ByteAmount;
 import org.apache.heron.streamlet.impl.KryoSerializer;
 
 /**
@@ -33,7 +34,7 @@ import org.apache.heron.streamlet.impl.KryoSerializer;
 public final class Config implements Serializable {
   private static final long serialVersionUID = 6204498077403076352L;
   private final double cpu;
-  private final long ram;
+  private final ByteAmount ram;
   private final DeliverySemantics deliverySemantics;
   private final Serializer serializer;
   private org.apache.heron.api.Config heronConfig;
@@ -61,9 +62,8 @@ public final class Config implements Serializable {
 
   private static class Defaults {
     static final boolean USE_KRYO = true;
-    static final org.apache.heron.api.Config CONFIG = new org.apache.heron.api.Config();
-    static final double CPU = 1.0;
-    static final long RAM = 100 * MB;
+    static final double CPU = -1.0;                             // -1 means undefined
+    static final ByteAmount RAM = ByteAmount.fromBytes(-1);     // -1 means undefined
     static final DeliverySemantics SEMANTICS = DeliverySemantics.ATMOST_ONCE;
     static final Serializer SERIALIZER = Serializer.KRYO;
   }
@@ -110,15 +110,15 @@ public final class Config implements Serializable {
    * @return the per-container RAM in bytes
    */
   public long getPerContainerRam() {
-    return ram;
+    return getPerContainerRamAsBytes();
   }
 
   /**
-   * Gets the RAM used per topology container as a number of gigabytes
-   * @return the per-container RAM in gigabytes
+   * Gets the RAM used per topology container as a number of bytes
+   * @return the per-container RAM in bytes
    */
-  public long getPerContainerRamAsGigabytes() {
-    return Math.round((double) ram / GB);
+  public long getPerContainerRamAsBytes() {
+    return ram.asBytes();
   }
 
   /**
@@ -126,15 +126,15 @@ public final class Config implements Serializable {
    * @return the per-container RAM in megabytes
    */
   public long getPerContainerRamAsMegabytes() {
-    return Math.round((double) ram / MB);
+    return ram.asMegabytes();
   }
 
   /**
-   * Gets the RAM used per topology container as a number of bytes
-   * @return the per-container RAM in bytes
+   * Gets the RAM used per topology container as a number of gigabytes
+   * @return the per-container RAM in gigabytes
    */
-  public long getPerContainerRamAsBytes() {
-    return getPerContainerRam();
+  public long getPerContainerRamAsGigabytes() {
+    return ram.asGigabytes();
   }
 
   /**
@@ -170,12 +170,12 @@ public final class Config implements Serializable {
   public static final class Builder {
     private org.apache.heron.api.Config config;
     private double cpu;
-    private long ram;
+    private ByteAmount ram;
     private DeliverySemantics deliverySemantics;
     private Serializer serializer;
 
     private Builder() {
-      config = Defaults.CONFIG;
+      config = new org.apache.heron.api.Config();
       cpu = Defaults.CPU;
       ram = Defaults.RAM;
       deliverySemantics = Defaults.SEMANTICS;
@@ -188,6 +188,9 @@ public final class Config implements Serializable {
      */
     public Builder setPerContainerCpu(double perContainerCpu) {
       this.cpu = perContainerCpu;
+      // Different packing algorithm might use different configs. Set all of them here.
+      config.setContainerCpuRequested(perContainerCpu);
+      config.setContainerMaxCpuHint(perContainerCpu);
       return this;
     }
 
@@ -196,8 +199,7 @@ public final class Config implements Serializable {
      * @param perContainerRam Per-container (per-instance) RAM expressed as a Long.
      */
     public Builder setPerContainerRam(long perContainerRam) {
-      this.ram = perContainerRam;
-      return this;
+      return setPerContainerRamInBytes(perContainerRam);
     }
 
     /**
@@ -205,7 +207,10 @@ public final class Config implements Serializable {
      * @param perContainerRam Per-container (per-instance) RAM expressed as a Long.
      */
     public Builder setPerContainerRamInBytes(long perContainerRam) {
-      this.ram = perContainerRam;
+      this.ram = ByteAmount.fromBytes(perContainerRam);
+      // Different packing algorithm might use different configs. Set all of them here.
+      config.setContainerRamRequested(ram);
+      config.setContainerMaxRamHint(ram);
       return this;
     }
 
@@ -214,8 +219,7 @@ public final class Config implements Serializable {
      * @param perContainerRamMB Per-container (per-instance) RAM expressed as a Long.
      */
     public Builder setPerContainerRamInMegabytes(long perContainerRamMB) {
-      this.ram = perContainerRamMB * MB;
-      return this;
+      return setPerContainerRam(perContainerRamMB * MB);
     }
 
     /**
@@ -223,8 +227,7 @@ public final class Config implements Serializable {
      * @param perContainerRamGB Per-container (per-instance) RAM expressed as a Long.
      */
     public Builder setPerContainerRamInGigabytes(long perContainerRamGB) {
-      this.ram = perContainerRamGB * GB;
-      return this;
+      return setPerContainerRam(perContainerRamGB * GB);
     }
 
     /**
diff --git a/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java b/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java
index 345e417..ea40f01 100644
--- a/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java
+++ b/heron/api/tests/java/org/apache/heron/streamlet/impl/StreamletImplTest.java
@@ -563,12 +563,22 @@ public class StreamletImplTest {
   }
 
   @Test
-  public void testConfigBuilder() {
+  public void testConfigBuilderDefaultConfig() {
     Config defaultConfig = Config.defaultConfig();
     assertEquals(defaultConfig.getSerializer(), Config.Serializer.KRYO);
-    assertEquals(0, Double.compare(defaultConfig.getPerContainerCpu(), 1.0));
-    assertEquals(defaultConfig.getPerContainerRam(), ByteAmount.fromMegabytes(100).asBytes());
+    assertEquals(0, Double.compare(defaultConfig.getPerContainerCpu(), -1.0));
+    assertEquals(defaultConfig.getPerContainerRam(), ByteAmount.fromBytes(-1).asBytes());
     assertEquals(defaultConfig.getDeliverySemantics(), Config.DeliverySemantics.ATMOST_ONCE);
+
+    org.apache.heron.api.Config conf = defaultConfig.getHeronConfig();
+    assertFalse(conf.containsKey(org.apache.heron.api.Config.TOPOLOGY_CONTAINER_CPU_REQUESTED));
+    assertFalse(conf.containsKey(org.apache.heron.api.Config.TOPOLOGY_CONTAINER_MAX_CPU_HINT));
+    assertFalse(conf.containsKey(org.apache.heron.api.Config.TOPOLOGY_CONTAINER_RAM_REQUESTED));
+    assertFalse(conf.containsKey(org.apache.heron.api.Config.TOPOLOGY_CONTAINER_MAX_RAM_HINT));
+  }
+
+  @Test
+  public void testConfigBuilderNonDefaultConfig() {
     Config nonDefaultConfig = Config.newBuilder()
         .setDeliverySemantics(Config.DeliverySemantics.EFFECTIVELY_ONCE)
         .setSerializer(Config.Serializer.JAVA)
@@ -581,6 +591,16 @@ public class StreamletImplTest {
     assertEquals(nonDefaultConfig.getPerContainerRamAsGigabytes(), 10);
     assertEquals(nonDefaultConfig.getPerContainerRamAsMegabytes(), 1024 * 10);
     assertEquals(0, Double.compare(nonDefaultConfig.getPerContainerCpu(), 3.5));
+
+    org.apache.heron.api.Config conf = nonDefaultConfig.getHeronConfig();
+    assertEquals(conf.get(org.apache.heron.api.Config.TOPOLOGY_CONTAINER_CPU_REQUESTED),
+                 "3.5");
+    assertEquals(conf.get(org.apache.heron.api.Config.TOPOLOGY_CONTAINER_MAX_CPU_HINT),
+                 "3.5");
+    assertEquals(conf.get(org.apache.heron.api.Config.TOPOLOGY_CONTAINER_RAM_REQUESTED),
+                 "10737418240");
+    assertEquals(conf.get(org.apache.heron.api.Config.TOPOLOGY_CONTAINER_MAX_RAM_HINT),
+                 "10737418240");
   }
 
   @Test