You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by revans2 <gi...@git.apache.org> on 2017/05/12 18:58:26 UTC

[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

GitHub user revans2 opened a pull request:

    https://github.com/apache/storm/pull/2113

    STORM-2497: Let Supervisor enforce memory and add in support for shared memory regions

    This is based off of #2112 so that the formatting is simpler.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/revans2/incubator-storm STORM-2497

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/2113.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2113
    
----
commit d8903b8358497c1b7c0ddb3588e779d5ce7c13b5
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Date:   2017-04-28T21:10:29Z

    STORM-2497: Let Supervisor enforce memory and add in support for shared
    memory regions

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2113#discussion_r120451872
  
    --- Diff: storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java ---
    @@ -728,8 +758,145 @@ protected String javaCmd(String cmd) {
             
             return commandList;
         }
    +    
    +  @Override
    +  public boolean isMemoryLimitViolated(LocalAssignment withUpdatedLimits) throws IOException {
    +    if (super.isMemoryLimitViolated(withUpdatedLimits)) {
    +      return true;
    +    }
    +    if (_resourceIsolationManager != null) {
    +      // In the short term the goal is to not shoot anyone unless we really need to.
    +      // The on heap should limit the memory usage in most cases to a reasonable amount
    +      // If someone is using way more than they requested this is a bug and we should
    +      // not allow it
    +      long usageMb;
    +      long memoryLimitMb;
    +      long hardMemoryLimitOver;
    +      String typeOfCheck;
    +
    +      if (withUpdatedLimits.is_has_node_shared_memory()) {
    +        //We need to do enforcement on a topology level, not a single worker level...
    +        // Because in for cgroups each page in shared memory goes to the worker that touched it
    +        // first. We may need to make this more plugable in the future and let the resource
    +        // isolation manager tell us what to do
    +        usageMb = getTotalTopologyMemoryUsed();
    +        memoryLimitMb = getTotalTopologyMemoryReserved(withUpdatedLimits);
    +        hardMemoryLimitOver = this.hardMemoryLimitOver * getTotalWorkersForThisTopology();
    +        typeOfCheck = "TOPOLOGY " + _topologyId;
    +      } else {
    +        usageMb = getMemoryUsageMb();
    +        memoryLimitMb = this.memoryLimitMB;
    +        hardMemoryLimitOver = this.hardMemoryLimitOver;
    +        typeOfCheck = "WORKER " + _workerId;
    +      }
    +      LOG.debug(
    +          "Enforcing memory usage for {} with usgae of {} out of {} total and a hard limit of {}",
    +          typeOfCheck,
    +          usageMb,
    +          memoryLimitMb,
    +          hardMemoryLimitOver);
    +
    +      if (usageMb <= 0) {
    +        //Looks like usage might now be supported
    +        return false;
    +      }
    +      long hardLimitMb =
    +          memoryLimitMb
    +              + Math.max(
    +                  (long) (memoryLimitMb * (hardMemoryLimitMultiplier - 1.0)), hardMemoryLimitOver);
    --- End diff --
    
    Nit: I think this would be easier to read as max(memoryLimit*hardMultiplier, memoryLimit + hardLimitOver), rather than adding first and then adjusting the multiplier.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #2113: STORM-2497: Let Supervisor enforce memory and add in supp...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the issue:

    https://github.com/apache/storm/pull/2113
  
    @kishorvpatil Could you look again.  I had to rebase becase of a minor conflict in the DefaultResourceAwareStrategy.  It ended up resulting in only some comment changes to this code, but I wanted to be sure you took a look. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2113#discussion_r120400360
  
    --- Diff: conf/defaults.yaml ---
    @@ -306,12 +307,18 @@ storm.cgroup.resources:
     storm.cgroup.hierarchy.name: "storm"
     storm.supervisor.cgroup.rootdir: "storm"
     storm.cgroup.cgexec.cmd: "/bin/cgexec"
    -storm.cgroup.memory.limit.tolerance.margin.mb: 128.0
    +storm.cgroup.memory.limit.tolerance.margin.mb: 0.0
    +storm.supervisor.memory.limit.tolerance.margin.mb: 128.0
    +storm.supervisor.hard.memory.limit.multiplier: 2.0
    +storm.supervisor.hard.memory.limit.overage: 2024
    --- End diff --
    
    Nitpick: Some of these properties are missing the unit name


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2113#discussion_r120688606
  
    --- Diff: storm-server/src/test/java/org/apache/storm/daemon/nimbus/NimbusTest.java ---
    @@ -24,8 +24,10 @@
     import org.apache.storm.topology.TopologyBuilder;
     import org.junit.Test;
     
    +import static org.junit.Assert.*;
    --- End diff --
    
    No, don't worry about it. Hadn't noticed this was in a test


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2113#discussion_r120433627
  
    --- Diff: storm-client/src/jvm/org/apache/storm/scheduler/SchedulerAssignmentImpl.java ---
    @@ -19,36 +19,82 @@
     
     import java.util.ArrayList;
     import java.util.Collection;
    +import java.util.Collections;
     import java.util.HashMap;
     import java.util.HashSet;
     import java.util.LinkedList;
     import java.util.List;
     import java.util.Map;
     import java.util.Set;
     
    -//TODO: improve this by maintaining slot -> executors as well for more efficient operations
    +import org.apache.storm.generated.WorkerResources;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
     public class SchedulerAssignmentImpl implements SchedulerAssignment {
    +    private static final Logger LOG = LoggerFactory.getLogger(SchedulerAssignmentImpl.class);
    +
         /**
          * topology-id this assignment is for.
          */
    -    String topologyId;
    +    private final String topologyId;
    +
         /**
          * assignment detail, a mapping from executor to <code>WorkerSlot</code>
          */
    -    Map<ExecutorDetails, WorkerSlot> executorToSlot;
    -    
    -    public SchedulerAssignmentImpl(String topologyId, Map<ExecutorDetails, WorkerSlot> executorToSlots) {
    -        this.topologyId = topologyId;
    -        this.executorToSlot = new HashMap<>(0);
    -        if (executorToSlots != null) {
    -            this.executorToSlot.putAll(executorToSlots);
    +    private final Map<ExecutorDetails, WorkerSlot> executorToSlot = new HashMap<>();
    +    private final Map<WorkerSlot, WorkerResources> resources = new HashMap<>();
    +    private final Map<String, Double> totalSharedOffHeap = new HashMap<>();
    +
    +    public SchedulerAssignmentImpl(String topologyId, Map<ExecutorDetails, WorkerSlot> executorToSlot,
    +            Map<WorkerSlot, WorkerResources> resources, Map<String, Double> totalSharedOffHeap) {
    +        this.topologyId = topologyId;       
    +        if (executorToSlot != null) {
    --- End diff --
    
    Nit: Could get rid of null checks by using Collections.emptyMap() in the constructor in L72 instead


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2113#discussion_r120648513
  
    --- Diff: storm-client/src/jvm/org/apache/storm/trident/operation/DefaultResourceDeclarer.java ---
    @@ -29,8 +33,8 @@
      */
     public class DefaultResourceDeclarer<T extends DefaultResourceDeclarer> implements ResourceDeclarer<T>, ITridentResource {
     
    -    private Map<String, Number> resources = new HashMap<>();
    -    private static Map<String, Object> conf = Utils.readStormConfig();
    +    private final transient Map<String, Number> resources = new HashMap<>();
    --- End diff --
    
    In trident Node inherits from DefaultResourceDeclarer, and Node is serialized out (repeatedly for different bolts and spouts in the topology).  These are never used after the topology is created so this reduces the serialized size of a trident topology.
    
    I'll add a comment in the code to make it clean why.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2113#discussion_r120445145
  
    --- Diff: storm-server/src/main/java/org/apache/storm/container/ResourceIsolationInterface.java ---
    @@ -24,53 +24,76 @@
     import java.util.Set;
     
     /**
    - * A plugin to support resource isolation and limitation within Storm
    + * A plugin to support resource isolation and limitation within Storm.
      */
     public interface ResourceIsolationInterface {
    -    
    +
         /**
          * Called when starting up
    +     *
          * @param conf the cluster config
          * @throws IOException on any error.
          */
         void prepare(Map<String, Object> conf) throws IOException;
     
         /**
    -     * This function should be used prior to starting the worker to reserve resources for the worker
    +     * This function should be used prior to starting the worker to reserve resources for the worker.
    +     *
          * @param workerId worker id of the worker to start
    -     * @param resources set of resources to limit
    +     * @param workerMemory the amount of memory for the worker or null if not enforced
    +     * @param workerCpu the amount of cpu for the worker or null if not enforced
          */
    -    void reserveResourcesForWorker(String workerId, Map<String, Number> resources);
    +    void reserveResourcesForWorker(String workerId, Integer workerMemory, Integer workerCpu);
     
         /**
    -     * This function will be called when the worker needs to shutdown.  This function should include logic to clean up after a worker is shutdown
    +     * This function will be called when the worker needs to shutdown. This function should include logic to clean up
    +     * after a worker is shutdown.
    +     *
          * @param workerId worker id to shutdown and clean up after
          */
         void releaseResourcesForWorker(String workerId);
     
         /**
          * After reserving resources for the worker (i.e. calling reserveResourcesForWorker). This function can be used
          * to get the modified command line to launch the worker with resource isolation
    -     * @param existingCommand
    +     *
    +     * @param existingCommand the current command to run that may need to be modified.
          * @return new commandline with necessary additions to launch worker with resource isolation
          */
         List<String> getLaunchCommand(String workerId, List<String> existingCommand);
     
         /**
          * After reserving resources for the worker (i.e. calling reserveResourcesForWorker). this function can be used
          * to get the launch command prefix
    +     *
          * @param workerId the of the worker
          * @return the command line prefix for launching a worker with resource isolation
          */
         List<String> getLaunchCommandPrefix(String workerId);
     
         /**
    -     * Get the list of PIDs currently in an isolated container
    +     * Get the list of PIDs currently in an isolated container.
    +     *
          * @param workerId the id of the worker to get these for
    -     * @return the set of PIDs, this will be combined with
    -     * other ways of getting PIDs. An Empty set if
    -     * no PIDs are found.
    +     * @return the set of PIDs, this will be combined with other ways of getting PIDs. An Empty set if no PIDs are
    +     *     found.
          * @throws IOException on any error
          */
    -    Set<Long> getRunningPIDs(String workerId) throws IOException;
    +    Set<Long> getRunningPids(String workerId) throws IOException;
    +
    +    /**
    +     * Get the current memory usage of the a give worker.
    --- End diff --
    
    the a give worker -> the given worker


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2113#discussion_r120432477
  
    --- Diff: storm-client/src/jvm/org/apache/storm/scheduler/SchedulerAssignment.java ---
    @@ -21,41 +21,51 @@
     import java.util.Map;
     import java.util.Set;
     
    +import org.apache.storm.generated.Assignment;
    +import org.apache.storm.generated.WorkerResources;
    +
     public interface SchedulerAssignment {
         /**
          * Does this slot occupied by this assignment?
          * @param slot
    -     * @return
    +     * @return true the slot is occupied else false
          */
         public boolean isSlotOccupied(WorkerSlot slot);
     
         /**
          * is the executor assigned?
          * 
          * @param executor
    -     * @return
    +     * @return true it is assigned else false
    --- End diff --
    
    Nit: missing if


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2113#discussion_r120433134
  
    --- Diff: storm-client/src/jvm/org/apache/storm/scheduler/SchedulerAssignmentImpl.java ---
    @@ -19,36 +19,82 @@
     
     import java.util.ArrayList;
     import java.util.Collection;
    +import java.util.Collections;
     import java.util.HashMap;
     import java.util.HashSet;
     import java.util.LinkedList;
     import java.util.List;
     import java.util.Map;
     import java.util.Set;
     
    -//TODO: improve this by maintaining slot -> executors as well for more efficient operations
    +import org.apache.storm.generated.WorkerResources;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
     public class SchedulerAssignmentImpl implements SchedulerAssignment {
    +    private static final Logger LOG = LoggerFactory.getLogger(SchedulerAssignmentImpl.class);
    +
         /**
          * topology-id this assignment is for.
          */
    -    String topologyId;
    +    private final String topologyId;
    +
         /**
          * assignment detail, a mapping from executor to <code>WorkerSlot</code>
          */
    -    Map<ExecutorDetails, WorkerSlot> executorToSlot;
    -    
    -    public SchedulerAssignmentImpl(String topologyId, Map<ExecutorDetails, WorkerSlot> executorToSlots) {
    -        this.topologyId = topologyId;
    -        this.executorToSlot = new HashMap<>(0);
    -        if (executorToSlots != null) {
    -            this.executorToSlot.putAll(executorToSlots);
    +    private final Map<ExecutorDetails, WorkerSlot> executorToSlot = new HashMap<>();
    +    private final Map<WorkerSlot, WorkerResources> resources = new HashMap<>();
    +    private final Map<String, Double> totalSharedOffHeap = new HashMap<>();
    --- End diff --
    
    Nit: Rename so it's apparent what the string is in this map


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/storm/pull/2113


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2113#discussion_r120436933
  
    --- Diff: storm-client/src/jvm/org/apache/storm/scheduler/WorkerSlot.java ---
    @@ -20,23 +20,12 @@
     public class WorkerSlot {
         private String nodeId;
         private int port;
    -    // amount of on-heap memory allocated to it
    -    private double memOnHeap = 0.0;
    -    // amount of off-heap memory allocated to it
    -    private double memOffHeap = 0.0;
    -    // amount of cpu allocated to it
    -    private double cpu = 0.0;
    -    
    -    public WorkerSlot(String nodeId, Number port) {
    -        this(nodeId, port, 0.0, 0.0, 0.0);
    -    }
     
    -    public WorkerSlot(String nodeId, Number port, double memOnHeap, double memOffHeap, double cpu) {
    +    public WorkerSlot(String nodeId, Number port) {
    +        if (port == null) throw new NullPointerException("port cannot be null");
    --- End diff --
    
    I think checkstyle will complain about this due to missing braces. Maybe you can use http://google.github.io/guava/releases/snapshot/api/docs/com/google/common/base/Preconditions.html#checkNotNull(T) instead?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2113#discussion_r120438318
  
    --- Diff: storm-client/src/jvm/org/apache/storm/topology/BaseConfigurationDeclarer.java ---
    @@ -52,18 +52,20 @@ public T setNumTasks(Number val) {
             return addConfiguration(Config.TOPOLOGY_TASKS, val);
         }
     
    +    @SuppressWarnings("unchecked")
         @Override
         public T setMemoryLoad(Number onHeap) {
             if (onHeap != null) {
    --- End diff --
    
    I'm wondering if it would be nicer to throw an NPE if parameters to these functions are null? Failing quietly could be confusing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2113#discussion_r120676803
  
    --- Diff: storm-client/src/jvm/org/apache/storm/trident/operation/DefaultResourceDeclarer.java ---
    @@ -29,8 +34,12 @@
      */
     public class DefaultResourceDeclarer<T extends DefaultResourceDeclarer> implements ResourceDeclarer<T>, ITridentResource {
     
    -    private Map<String, Number> resources = new HashMap<>();
    -    private static Map<String, Object> conf = Utils.readStormConfig();
    +    //@{link org.apache.storm.trident.planner.Node} and several other tirdent classes inherit from DefaultResourceDeclarer
    +    // These classes are serialized out as part of the bolts and spouts of a topology, often for each bolt/spout in the topology.
    +    // The following are marked as transiant because they are never used after the topology is created so keeping them around just wasts
    --- End diff --
    
    wasts


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2113#discussion_r120407010
  
    --- Diff: examples/storm-starter/src/jvm/org/apache/storm/starter/ResourceAwareExampleTopology.java ---
    @@ -34,65 +39,119 @@
     import org.apache.storm.tuple.Values;
     
     public class ResourceAwareExampleTopology {
    -  public static class ExclamationBolt extends BaseRichBolt {
    -    OutputCollector _collector;
    -
    -    @Override
    -    public void prepare(Map<String, Object> conf, TopologyContext context, OutputCollector collector) {
    -      _collector = collector;
    -    }
    -
    -    @Override
    -    public void execute(Tuple tuple) {
    -      _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
    -      _collector.ack(tuple);
    +    public static class ExclamationBolt extends BaseRichBolt {
    +        //Have a crummy cache to show off shared memory accounting
    +        private static final ConcurrentHashMap<String, String> myCrummyCache =
    +            new ConcurrentHashMap<>();
    +        private static final int CACHE_SIZE = 100_000;
    +        OutputCollector _collector;
    +
    +        protected static String getFromCache(String key) {
    +            return myCrummyCache.get(key);
    +        }
    +
    +        protected static void addToCache(String key, String value) {
    +            myCrummyCache.putIfAbsent(key, value);
    +            int numToRemove = myCrummyCache.size() - CACHE_SIZE;
    +            if (numToRemove > 0) {
    +                //Remove something randomly...
    +                Iterator<Entry<String, String>> it = myCrummyCache.entrySet().iterator();
    +                for (; numToRemove > 0 && it.hasNext(); numToRemove--) {
    +                    it.next();
    +                    it.remove();
    +                }
    +            }
    +        }
    +
    +        @Override
    +        public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
    +            _collector = collector;
    +        }
    +
    +        @Override
    +        public void execute(Tuple tuple) {
    +            String orig = tuple.getString(0);
    +            String ret = getFromCache(orig);
    +            if (ret == null) {
    +                ret = orig + "!!!";
    +                addToCache(orig, ret);
    +            }
    +            _collector.emit(tuple, new Values(ret));
    +            _collector.ack(tuple);
    +        }
    +
    +        @Override
    +        public void declareOutputFields(OutputFieldsDeclarer declarer) {
    +            declarer.declare(new Fields("word"));
    +        }
         }
     
    -    @Override
    -    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    -      declarer.declare(new Fields("word"));
    +    public static void main(String[] args) throws Exception {
    +        TopologyBuilder builder = new TopologyBuilder();
    +
    +        //A topology can set resources in terms of CPU and Memory for each component
    +        // These can be chained (like with setting the CPU requirement)
    +        SpoutDeclarer spout = builder.setSpout("word", new TestWordSpout(), 10).setCPULoad(20);
    +        // Or done separately like with setting the
    +        // onheap and offheap memory requirement
    +        spout.setMemoryLoad(64, 16);
    +        //On heap memory is used to help calculate the heap of the java process for the worker
    +        // off heap memory is for things like JNI memory allocated off heap, or when using the
    +        // ShellBolt or ShellSpout.  In this case the 16 MB of off heap is just as an example
    +        // as we are not using it.
    +
    +        // Some times a Bolt or Spout will have some memory that is shared between the instances
    +        // These are typically caches, but could be anything like a static database that is memory
    +        // mapped into the processes. These can be declared separately and added to the bolts and
    +        // spouts that use them.  Or if only one uses it they can be created inline with the add
    +        SharedOnHeap exclaimCache = new SharedOnHeap(100, "exclaim-cache");
    +        SharedOffHeapWithinNode notImplementedButJustAnExample =
    +            new SharedOffHeapWithinNode(500, "not-implemented-node-level-cache");
    +
    +        //If CPU or memory is not set the values stored in topology.component.resources.onheap.memory.mb,
    +        // topology.component.resources.offheap.memory.mb and topology.component.cpu.pcore.percent
    +        // will be used instead
    +        builder
    +            .setBolt("exclaim1", new ExclamationBolt(), 3)
    +            .shuffleGrouping("word")
    +            .addSharedMemory(exclaimCache);
    +
    +        builder
    +            .setBolt("exclaim2", new ExclamationBolt(), 2)
    +            .shuffleGrouping("exclaim1")
    +            .setMemoryLoad(100)
    +            .addSharedMemory(exclaimCache)
    +            .addSharedMemory(notImplementedButJustAnExample);
    +
    +        Config conf = new Config();
    +        conf.setDebug(true);
    +
    +        //in RAS the number of workers will be computed for you so you don't need to set
    +        //conf.setNumWorkers(3);
    +
    +        // The size of a worker is limited by the amount of heap assigned to it and can be overridden by
    +        conf.setTopologyWorkerMaxHeapSize(1024.0);
    +        // This is to try and balance the time needed to devote to GC against not needing to
    +        // serialize/deserialize tuples
    +
    +        //The priority of a topology describes the importance of the topology in decreasing importance
    +        // starting from 0 (i.e. 0 is the highest priority and the priority importance decreases as the priority number increases).
    +        //Recommended range of 0-29 but no hard limit set.
    +        // If there are not enough resources in a cluster the priority in combination with how far over a guarantees
    +        // a user is will decide which topologies are run and which ones are not.
    +        conf.setTopologyPriority(29);
    +
    +        //set to use the default resource aware strategy when using the MultitenantResourceAwareBridgeScheduler
    +        conf.setTopologyStrategy(
    +            "org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy");
    +
    +        String topoName = "test";
    +        if (args != null && args.length > 0) {
    +            topoName = args[0];
    +        }
    +        //Not needed on RAS
    --- End diff --
    
    This was already mentioned


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #2113: STORM-2497: Let Supervisor enforce memory and add in supp...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the issue:

    https://github.com/apache/storm/pull/2113
  
    I think I addressed all of the review comments


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2113#discussion_r120432365
  
    --- Diff: storm-client/src/jvm/org/apache/storm/scheduler/SchedulerAssignment.java ---
    @@ -21,41 +21,51 @@
     import java.util.Map;
     import java.util.Set;
     
    +import org.apache.storm.generated.Assignment;
    +import org.apache.storm.generated.WorkerResources;
    +
     public interface SchedulerAssignment {
         /**
          * Does this slot occupied by this assignment?
          * @param slot
    -     * @return
    +     * @return true the slot is occupied else false
    --- End diff --
    
    Nitpick: missing "if" between true and the. Also the comment two lines up is phrased a little weirdly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2113#discussion_r120451383
  
    --- Diff: storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java ---
    @@ -728,8 +758,145 @@ protected String javaCmd(String cmd) {
             
             return commandList;
         }
    +    
    +  @Override
    +  public boolean isMemoryLimitViolated(LocalAssignment withUpdatedLimits) throws IOException {
    +    if (super.isMemoryLimitViolated(withUpdatedLimits)) {
    +      return true;
    +    }
    +    if (_resourceIsolationManager != null) {
    +      // In the short term the goal is to not shoot anyone unless we really need to.
    +      // The on heap should limit the memory usage in most cases to a reasonable amount
    +      // If someone is using way more than they requested this is a bug and we should
    +      // not allow it
    +      long usageMb;
    +      long memoryLimitMb;
    +      long hardMemoryLimitOver;
    +      String typeOfCheck;
    +
    +      if (withUpdatedLimits.is_has_node_shared_memory()) {
    +        //We need to do enforcement on a topology level, not a single worker level...
    +        // Because in for cgroups each page in shared memory goes to the worker that touched it
    +        // first. We may need to make this more plugable in the future and let the resource
    +        // isolation manager tell us what to do
    +        usageMb = getTotalTopologyMemoryUsed();
    +        memoryLimitMb = getTotalTopologyMemoryReserved(withUpdatedLimits);
    +        hardMemoryLimitOver = this.hardMemoryLimitOver * getTotalWorkersForThisTopology();
    +        typeOfCheck = "TOPOLOGY " + _topologyId;
    +      } else {
    +        usageMb = getMemoryUsageMb();
    +        memoryLimitMb = this.memoryLimitMB;
    +        hardMemoryLimitOver = this.hardMemoryLimitOver;
    +        typeOfCheck = "WORKER " + _workerId;
    +      }
    +      LOG.debug(
    +          "Enforcing memory usage for {} with usgae of {} out of {} total and a hard limit of {}",
    +          typeOfCheck,
    +          usageMb,
    +          memoryLimitMb,
    +          hardMemoryLimitOver);
    +
    +      if (usageMb <= 0) {
    +        //Looks like usage might now be supported
    --- End diff --
    
    now -> not(?)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2113#discussion_r120406827
  
    --- Diff: examples/storm-starter/src/jvm/org/apache/storm/starter/ResourceAwareExampleTopology.java ---
    @@ -34,65 +39,119 @@
     import org.apache.storm.tuple.Values;
     
     public class ResourceAwareExampleTopology {
    -  public static class ExclamationBolt extends BaseRichBolt {
    -    OutputCollector _collector;
    -
    -    @Override
    -    public void prepare(Map<String, Object> conf, TopologyContext context, OutputCollector collector) {
    -      _collector = collector;
    -    }
    -
    -    @Override
    -    public void execute(Tuple tuple) {
    -      _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
    -      _collector.ack(tuple);
    +    public static class ExclamationBolt extends BaseRichBolt {
    +        //Have a crummy cache to show off shared memory accounting
    +        private static final ConcurrentHashMap<String, String> myCrummyCache =
    +            new ConcurrentHashMap<>();
    +        private static final int CACHE_SIZE = 100_000;
    +        OutputCollector _collector;
    +
    +        protected static String getFromCache(String key) {
    +            return myCrummyCache.get(key);
    +        }
    +
    +        protected static void addToCache(String key, String value) {
    +            myCrummyCache.putIfAbsent(key, value);
    +            int numToRemove = myCrummyCache.size() - CACHE_SIZE;
    +            if (numToRemove > 0) {
    +                //Remove something randomly...
    +                Iterator<Entry<String, String>> it = myCrummyCache.entrySet().iterator();
    +                for (; numToRemove > 0 && it.hasNext(); numToRemove--) {
    +                    it.next();
    +                    it.remove();
    +                }
    +            }
    +        }
    +
    +        @Override
    +        public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
    +            _collector = collector;
    +        }
    +
    +        @Override
    +        public void execute(Tuple tuple) {
    +            String orig = tuple.getString(0);
    +            String ret = getFromCache(orig);
    +            if (ret == null) {
    +                ret = orig + "!!!";
    +                addToCache(orig, ret);
    +            }
    +            _collector.emit(tuple, new Values(ret));
    +            _collector.ack(tuple);
    +        }
    +
    +        @Override
    +        public void declareOutputFields(OutputFieldsDeclarer declarer) {
    +            declarer.declare(new Fields("word"));
    +        }
         }
     
    -    @Override
    -    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    -      declarer.declare(new Fields("word"));
    +    public static void main(String[] args) throws Exception {
    +        TopologyBuilder builder = new TopologyBuilder();
    +
    +        //A topology can set resources in terms of CPU and Memory for each component
    +        // These can be chained (like with setting the CPU requirement)
    +        SpoutDeclarer spout = builder.setSpout("word", new TestWordSpout(), 10).setCPULoad(20);
    +        // Or done separately like with setting the
    +        // onheap and offheap memory requirement
    +        spout.setMemoryLoad(64, 16);
    +        //On heap memory is used to help calculate the heap of the java process for the worker
    +        // off heap memory is for things like JNI memory allocated off heap, or when using the
    +        // ShellBolt or ShellSpout.  In this case the 16 MB of off heap is just as an example
    +        // as we are not using it.
    +
    +        // Some times a Bolt or Spout will have some memory that is shared between the instances
    +        // These are typically caches, but could be anything like a static database that is memory
    +        // mapped into the processes. These can be declared separately and added to the bolts and
    +        // spouts that use them.  Or if only one uses it they can be created inline with the add
    +        SharedOnHeap exclaimCache = new SharedOnHeap(100, "exclaim-cache");
    +        SharedOffHeapWithinNode notImplementedButJustAnExample =
    +            new SharedOffHeapWithinNode(500, "not-implemented-node-level-cache");
    +
    +        //If CPU or memory is not set the values stored in topology.component.resources.onheap.memory.mb,
    +        // topology.component.resources.offheap.memory.mb and topology.component.cpu.pcore.percent
    +        // will be used instead
    +        builder
    +            .setBolt("exclaim1", new ExclamationBolt(), 3)
    +            .shuffleGrouping("word")
    +            .addSharedMemory(exclaimCache);
    +
    +        builder
    +            .setBolt("exclaim2", new ExclamationBolt(), 2)
    +            .shuffleGrouping("exclaim1")
    +            .setMemoryLoad(100)
    +            .addSharedMemory(exclaimCache)
    +            .addSharedMemory(notImplementedButJustAnExample);
    +
    +        Config conf = new Config();
    +        conf.setDebug(true);
    +
    +        //in RAS the number of workers will be computed for you so you don't need to set
    +        //conf.setNumWorkers(3);
    +
    +        // The size of a worker is limited by the amount of heap assigned to it and can be overridden by
    +        conf.setTopologyWorkerMaxHeapSize(1024.0);
    +        // This is to try and balance the time needed to devote to GC against not needing to
    +        // serialize/deserialize tuples
    --- End diff --
    
    Not sure I understand this. Are more executors ending up in a worker if the max heap size is larger, or how does this work?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2113#discussion_r120677418
  
    --- Diff: storm-server/src/test/java/org/apache/storm/daemon/nimbus/NimbusTest.java ---
    @@ -24,8 +24,10 @@
     import org.apache.storm.topology.TopologyBuilder;
     import org.junit.Test;
     
    +import static org.junit.Assert.*;
    --- End diff --
    
    I think checkstyle bans this


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2113#discussion_r120654636
  
    --- Diff: storm-server/src/main/java/org/apache/storm/container/cgroup/CgroupManager.java ---
    @@ -151,22 +161,40 @@ public void reserveResourcesForWorker(String workerId, Map<String, Number> resou
                 }
             }
     
    -        if (totalMem != null) {
    -            MemoryCore memCore = (MemoryCore) workerGroup.getCores().get(SubSystemType.memory);
    -            try {
    -                memCore.setPhysicalUsageLimit(Long.valueOf(totalMem.longValue() * 1024 * 1024));
    -            } catch (IOException e) {
    -                throw new RuntimeException("Cannot set memory.limit_in_bytes! Exception: ", e);
    +        if ((boolean) this.conf.get(DaemonConfig.STORM_CGROUP_MEMORY_ENFORCEMENT_ENABLE)) {
    +            if (totalMem != null) {
    +                int cgroupMem =
    +                    (int)
    +                        (Math.ceil(
    +                            ObjectReader.getDouble(
    +                                this.conf.get(DaemonConfig.STORM_CGROUP_MEMORY_LIMIT_TOLERANCE_MARGIN_MB),
    --- End diff --
    
    Except in some unit tests.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2113#discussion_r120444941
  
    --- Diff: storm-server/src/main/java/org/apache/storm/DaemonConfig.java ---
    @@ -889,10 +890,83 @@
         public static String STORM_CGROUP_CGEXEC_CMD = "storm.cgroup.cgexec.cmd";
     
         /**
    -     * The amount of memory a worker can exceed its allocation before cgroup will kill it
    +     * Please use STORM_SUPERVISOR_MEMORY_LIMIT_TOLERANCE_MARGIN_MB instead. The amount of memory a
    +     * worker can exceed its allocation before cgroup will kill it.
    +     */
    +    @isPositiveNumber(includeZero = true)
    +    public static String STORM_CGROUP_MEMORY_LIMIT_TOLERANCE_MARGIN_MB =
    +        "storm.cgroup.memory.limit.tolerance.margin.mb";
    +
    +    /**
    +     * Java does not always play nicely with cgroups. It is coming but not fully implemented and not
    +     * for the way storm uses cgroups. In the short term you can disable the hard memory enforcement
    +     * by cgroups and let the supervisor handle shooting workers going over their limit in a kinder
    +     * way.
    +     */
    +    @isBoolean
    +    public static String STORM_CGROUP_MEMORY_ENFORCEMENT_ENABLE = "storm.cgroup.memory.enforcement.enable";
    +
    +    // Configs for memory enforcement does by the supervisor (not cgroups directly)
    +
    +    /**
    +     * Memory given to each worker for free (because java and storm have some overhead). This is
    +     * memory on the box that the workers can use. This should not be included in
    +     * SUPERVISOR_MEMORY_CAPACITY_MB, as nimbus does not use this memory for scheduling.
    +     */
    +    @isPositiveNumber
    +    public static String STORM_SUPERVISOR_MEMORY_LIMIT_TOLERANCE_MARGIN_MB =
    +        "storm.supervisor.memory.limit.tolerance.margin.mb";
    +
    +    /**
    +     * A multiplier for the memory limit of a worker that will have the supervisor shoot it
    +     * immediately. 1.0 means shoot the worker as soon as it goes over. 2.0 means shoot the worker if
    +     * its usage is double what was requested. This value is combined with
    +     * STORM_SUPERVISOR_HARD_MEMORY_LIMIT_OVERAGE and which ever is greater is used for enforcement.
    +     * This allows small workers to not be shot.
    +     */
    +    @isPositiveNumber
    +    public static String STORM_SUPERVISOR_HARD_MEMORY_LIMIT_MULTIPLIER =
    +        "storm.supervisor.hard.memory.limit.multiplier";
    +
    +    /**
    +     * If the memory usage of a worker goes over its limit by this value is it shot immediately. This
    +     * value is combined with STORM_SUPERVISOR_HARD_LIMIT_MEMORY_MULTIPLIER and which ever is greater
    +     * is used for enforcement. This allows small workers to not be shot.
    +     */
    +    @isPositiveNumber(includeZero = true)
    +    public static String STORM_SUPERVISOR_HARD_LIMIT_MEMORY_OVERAGE = "storm.supervisor.hard.memory.limit.overage";
    +
    +    /**
    +     * If the amount of memory that is free in the system (either on the box or in the supervisor's
    +     * cgroup) is below this number (in MB) consider the system to be in low memory mode and start
    +     * shooting workers if they are over their limit.
    +     */
    +    @isPositiveNumber
    +    public static String STORM_SUPERVISOR_LOW_MEMORY_THRESHOLD = "storm.supervisor.low.memory.threshold";
    +
    +    /**
    +     * If the amount of memory that is free in the system (either on the box or in the supervisor's
    +     * cgroup) is below this number (in MB) consider the system to be a little low on memory and start
    +     * shooting workers if they are over their limit for a given grace period
    +     * STORM_SUPERVISOR_MEDIUM_MEMORY_GRACE_PERIOD.
    +     */
    +    @isPositiveNumber
    +    public static String STORM_SUPERVISOR_MEDIUM_MEMORY_THRESHOLD = "storm.supervisor.medium.memory.threshold";
    +
    +    /**
    +     * The number of milliseconds that a worker is allowed to be over their limit when there is a
    +     * medium amount of memory free in the system.
          */
         @isPositiveNumber
    -    public static String STORM_CGROUP_MEMORY_LIMIT_TOLERANCE_MARGIN_MB = "storm.cgroup.memory.limit.tolerance.margin.mb";
    +    public static String STORM_SUPERVISOR_MEDIUM_MEMORY_GRACE_PERIOD =
    +        "storm.supervisor.medium.memory.grace.period";
    +
    +    /**
    +     * @{see Config#TOPOLOGY_SCHEDULER_STRATEGY} this allows us to validate on the server side that it is
    --- End diff --
    
    Could this comment be rephrased? I'm a little confused what this should be set to, or what it does.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2113#discussion_r120407132
  
    --- Diff: storm-client/src/jvm/org/apache/storm/Config.java ---
    @@ -242,7 +241,8 @@
          * The strategy to use when scheduling a topology with Resource Aware Scheduler
          */
         @NotNull
    -    @isImplementationOfClass(implementsClass = IStrategy.class)
    +    @isString
    +    //TODO @isImplementationOfClass(implementsClass = IStrategy.class)
    --- End diff --
    
    TODO


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2113#discussion_r120675985
  
    --- Diff: storm-client/src/jvm/org/apache/storm/scheduler/SchedulerAssignment.java ---
    @@ -21,41 +21,52 @@
     import java.util.Map;
     import java.util.Set;
     
    +import org.apache.storm.generated.Assignment;
    +import org.apache.storm.generated.WorkerResources;
    +
     public interface SchedulerAssignment {
         /**
          * Does this slot occupied by this assignment?
          * @param slot
    -     * @return
    +     * @return true if the slot is occupied else false
          */
         public boolean isSlotOccupied(WorkerSlot slot);
     
         /**
    -     * is the executor assigned?
    +     * Is the executor assigned?
          * 
          * @param executor
    -     * @return
    +     * @return true if it is assigned else false
          */
         public boolean isExecutorAssigned(ExecutorDetails executor);
         
         /**
    -     * get the topology-id this assignment is for.
    -     * @return
    +     * Return the ID of the topolgoy.
    --- End diff --
    
    topolgoy


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2113#discussion_r120668119
  
    --- Diff: storm-client/src/jvm/org/apache/storm/topology/BaseConfigurationDeclarer.java ---
    @@ -52,18 +52,20 @@ public T setNumTasks(Number val) {
             return addConfiguration(Config.TOPOLOGY_TASKS, val);
         }
     
    +    @SuppressWarnings("unchecked")
         @Override
         public T setMemoryLoad(Number onHeap) {
             if (onHeap != null) {
    --- End diff --
    
    Thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2113#discussion_r120668059
  
    --- Diff: storm-client/src/jvm/org/apache/storm/scheduler/SchedulerAssignmentImpl.java ---
    @@ -19,36 +19,82 @@
     
     import java.util.ArrayList;
     import java.util.Collection;
    +import java.util.Collections;
     import java.util.HashMap;
     import java.util.HashSet;
     import java.util.LinkedList;
     import java.util.List;
     import java.util.Map;
     import java.util.Set;
     
    -//TODO: improve this by maintaining slot -> executors as well for more efficient operations
    +import org.apache.storm.generated.WorkerResources;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
     public class SchedulerAssignmentImpl implements SchedulerAssignment {
    +    private static final Logger LOG = LoggerFactory.getLogger(SchedulerAssignmentImpl.class);
    +
         /**
          * topology-id this assignment is for.
          */
    -    String topologyId;
    +    private final String topologyId;
    +
         /**
          * assignment detail, a mapping from executor to <code>WorkerSlot</code>
          */
    -    Map<ExecutorDetails, WorkerSlot> executorToSlot;
    -    
    -    public SchedulerAssignmentImpl(String topologyId, Map<ExecutorDetails, WorkerSlot> executorToSlots) {
    -        this.topologyId = topologyId;
    -        this.executorToSlot = new HashMap<>(0);
    -        if (executorToSlots != null) {
    -            this.executorToSlot.putAll(executorToSlots);
    +    private final Map<ExecutorDetails, WorkerSlot> executorToSlot = new HashMap<>();
    +    private final Map<WorkerSlot, WorkerResources> resources = new HashMap<>();
    +    private final Map<String, Double> totalSharedOffHeap = new HashMap<>();
    +
    +    public SchedulerAssignmentImpl(String topologyId, Map<ExecutorDetails, WorkerSlot> executorToSlot,
    +            Map<WorkerSlot, WorkerResources> resources, Map<String, Double> totalSharedOffHeap) {
    +        this.topologyId = topologyId;       
    +        if (executorToSlot != null) {
    --- End diff --
    
    Ok. I'm the opposite way, I prefer avoiding null checks. But this is fine :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2113#discussion_r120459423
  
    --- Diff: storm-server/src/main/java/org/apache/storm/container/cgroup/CgroupManager.java ---
    @@ -151,22 +161,40 @@ public void reserveResourcesForWorker(String workerId, Map<String, Number> resou
                 }
             }
     
    -        if (totalMem != null) {
    -            MemoryCore memCore = (MemoryCore) workerGroup.getCores().get(SubSystemType.memory);
    -            try {
    -                memCore.setPhysicalUsageLimit(Long.valueOf(totalMem.longValue() * 1024 * 1024));
    -            } catch (IOException e) {
    -                throw new RuntimeException("Cannot set memory.limit_in_bytes! Exception: ", e);
    +        if ((boolean) this.conf.get(DaemonConfig.STORM_CGROUP_MEMORY_ENFORCEMENT_ENABLE)) {
    +            if (totalMem != null) {
    +                int cgroupMem =
    +                    (int)
    +                        (Math.ceil(
    +                            ObjectReader.getDouble(
    +                                this.conf.get(DaemonConfig.STORM_CGROUP_MEMORY_LIMIT_TOLERANCE_MARGIN_MB),
    --- End diff --
    
    I'm wondering if having defaults here is necessary? Isn't defaults.yaml always expected to be present?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2113#discussion_r120645730
  
    --- Diff: storm-client/src/jvm/org/apache/storm/scheduler/SchedulerAssignmentImpl.java ---
    @@ -141,4 +222,42 @@ public String getTopologyId() {
             }
             return ret;
         }
    -}
    +
    +    @Override
    +    public Map<WorkerSlot, WorkerResources> getScheduledResources() {
    +        return resources;
    +    }
    +
    +    public void setTotalSharedOffHeapMemory(String node, double value) {
    +        totalSharedOffHeap.put(node, value);
    +    }
    +    
    +    @Override
    +    public Map<String, Double> getTotalSharedOffHeapMemory() {
    +        return totalSharedOffHeap;
    +    }
    +
    +    /**
    +     * Update the resources for this assignment (This should go aware when the RAS-MT bridge goes away
    --- End diff --
    
    Sorry code I forgot to remove that is not needed here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2113#discussion_r120454284
  
    --- Diff: storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java ---
    @@ -559,9 +688,17 @@ public void cleanUpForRestart() throws IOException {
         public abstract boolean runProfiling(ProfileRequest request, boolean stop) throws IOException, InterruptedException;
     
         /**
    -     * @return the id of the container or null if there is no worker id right now.
    +     * Get the id of the container or null if there is no worker id right now.
          */
         public String getWorkerId() {
             return _workerId;
         }
    +
    +    /**
    +     * Get the topology id this is a part of.
    +     */
    +    public String getTopoogyId() {
    --- End diff --
    
    topoogy -> topology


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2113#discussion_r120404519
  
    --- Diff: docs/Resource_Aware_Scheduler_overview.md ---
    @@ -45,38 +46,53 @@ For a Storm Topology, the user can now specify the amount of resources a topolog
     ### Setting Memory Requirement
     
     API to set component memory requirement:
    -
    +```
         public T setMemoryLoad(Number onHeap, Number offHeap)
    -
    +```
     Parameters:
     * Number onHeap – The amount of on heap memory an instance of this component will consume in megabytes
     * Number offHeap – The amount of off heap memory an instance of this component will consume in megabytes
     
     The user also has to option to just specify the on heap memory requirement if the component does not have an off heap memory need.
    -
    +```
         public T setMemoryLoad(Number onHeap)
    -
    +```
     Parameters:
     * Number onHeap – The amount of on heap memory an instance of this component will consume
     
     If no value is provided for offHeap, 0.0 will be used. If no value is provided for onHeap, or if the API is never called for a component, the default value will be used.
     
     Example of Usage:
    -
    +```
         SpoutDeclarer s1 = builder.setSpout("word", new TestWordSpout(), 10);
         s1.setMemoryLoad(1024.0, 512.0);
         builder.setBolt("exclaim1", new ExclamationBolt(), 3)
                     .shuffleGrouping("word").setMemoryLoad(512.0);
    -
    +```
     The entire memory requested for this topology is 16.5 GB. That is from 10 spouts with 1GB on heap memory and 0.5 GB off heap memory each and 3 bolts with 0.5 GB on heap memory each.
     
    +<div id='Setting-Shared-Memory'/>
    +### Shared Memory
    +
    +In some cases you may have memory that is shared between components. It may be a cache shared within a worker between instances of a bolt, or it might be static data that is memory mapped into a bolt and is shared accross a worker.  In any case you can specify your share memory request by
    +creating one of `SharedOffHeapWithinNode`, `SharedOffHeapWithinWorker`, or `SharedOnHeap` and adding it to bolts and spouts that use that shared memory.
    --- End diff --
    
    Is there a difference between `SharedOnHeap` and just declaring a static field?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #2113: STORM-2497: Let Supervisor enforce memory and add in supp...

Posted by jerrypeng <gi...@git.apache.org>.
Github user jerrypeng commented on the issue:

    https://github.com/apache/storm/pull/2113
  
    cool feature


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2113#discussion_r120434212
  
    --- Diff: storm-client/src/jvm/org/apache/storm/scheduler/SchedulerAssignmentImpl.java ---
    @@ -61,44 +107,79 @@ public int hashCode() {
         
         @Override
         public boolean equals(Object other) {
    -        if (other == this) return true;
    -        if (other instanceof SchedulerAssignmentImpl) {
    -            SchedulerAssignmentImpl sother = (SchedulerAssignmentImpl) other;
    -            return topologyId.equals(sother.topologyId) &&
    -                    executorToSlot.equals(sother.executorToSlot);
    +        if (!equalsIgnoreResources(other)) {
    +            return false;
             }
    -        return false;
    +        SchedulerAssignmentImpl o = (SchedulerAssignmentImpl) other;
    +        //Normalize some things
    +        Map<WorkerSlot, WorkerResources> selfResources = this.resources;
    +        if (selfResources == null) selfResources = Collections.emptyMap();
    --- End diff --
    
    These maps are never null?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2113#discussion_r120439921
  
    --- Diff: storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.java ---
    @@ -559,6 +565,23 @@ public T addConfigurations(Map<String, Object> conf) {
                 _commons.get(_id).set_json_conf(mergeIntoJson(parseJson(currConf), conf));
                 return (T) this;
             }
    +        
    +        @SuppressWarnings("unchecked")
    +        @Override
    +        public T addSharedMemory(SharedMemory request) {
    +            SharedMemory found = _sharedMemory.get(request.get_name());
    +            if (found != null && !found.equals(request)) {
    +                throw new IllegalArgumentException("Cannot have multiple different shared memory regions with the same name");
    +            }
    +            _sharedMemory.put(request.get_name(), request);
    +            Set<String> mems = _componentToSharedMemory.get(_id);
    --- End diff --
    
    Nit: Could be shortened to `_componentToSharedMemory.computeIfAbsent(_id, HashSet::new)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2113#discussion_r120676761
  
    --- Diff: storm-client/src/jvm/org/apache/storm/trident/operation/DefaultResourceDeclarer.java ---
    @@ -29,8 +34,12 @@
      */
     public class DefaultResourceDeclarer<T extends DefaultResourceDeclarer> implements ResourceDeclarer<T>, ITridentResource {
     
    -    private Map<String, Number> resources = new HashMap<>();
    -    private static Map<String, Object> conf = Utils.readStormConfig();
    +    //@{link org.apache.storm.trident.planner.Node} and several other tirdent classes inherit from DefaultResourceDeclarer
    --- End diff --
    
    tirdent


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2113#discussion_r120440846
  
    --- Diff: storm-client/src/jvm/org/apache/storm/trident/operation/DefaultResourceDeclarer.java ---
    @@ -29,8 +33,8 @@
      */
     public class DefaultResourceDeclarer<T extends DefaultResourceDeclarer> implements ResourceDeclarer<T>, ITridentResource {
     
    -    private Map<String, Number> resources = new HashMap<>();
    -    private static Map<String, Object> conf = Utils.readStormConfig();
    +    private final transient Map<String, Number> resources = new HashMap<>();
    --- End diff --
    
    Why transient?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2113#discussion_r120435642
  
    --- Diff: storm-client/src/jvm/org/apache/storm/scheduler/SchedulerAssignmentImpl.java ---
    @@ -141,4 +222,42 @@ public String getTopologyId() {
             }
             return ret;
         }
    -}
    +
    +    @Override
    +    public Map<WorkerSlot, WorkerResources> getScheduledResources() {
    +        return resources;
    +    }
    +
    +    public void setTotalSharedOffHeapMemory(String node, double value) {
    +        totalSharedOffHeap.put(node, value);
    +    }
    +    
    +    @Override
    +    public Map<String, Double> getTotalSharedOffHeapMemory() {
    +        return totalSharedOffHeap;
    +    }
    +
    +    /**
    +     * Update the resources for this assignment (This should go aware when the RAS-MT bridge goes away
    --- End diff --
    
    Nit: aware -> away. What is MT btw?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2113#discussion_r120677469
  
    --- Diff: storm-server/src/test/java/org/apache/storm/daemon/nimbus/NimbusTest.java ---
    @@ -44,6 +46,11 @@ public void testMemoryLoadLargerThanMaxHeapSize() throws Exception {
             config1.put(Config.TOPOLOGY_SUBMITTER_USER, "zhuo");
             config1.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, 128.0);
             config1.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 129.0);
    -        Nimbus.validateTopologyWorkerMaxHeapSizeConfigs(config1, stormTopology1);
    +        try {
    +            Nimbus.validateTopologyWorkerMaxHeapSizeConfigs(config1, stormTopology1);
    +            fail("Expected exeption not thrown");
    --- End diff --
    
    exeption


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #2113: STORM-2497: Let Supervisor enforce memory and add in supp...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the issue:

    https://github.com/apache/storm/pull/2113
  
    @srdo I think I got the rest of them
    
    @kishorvpatil could you take a look at let me know if the rework is good for you?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2113#discussion_r120644839
  
    --- Diff: storm-client/src/jvm/org/apache/storm/scheduler/SchedulerAssignmentImpl.java ---
    @@ -19,36 +19,82 @@
     
     import java.util.ArrayList;
     import java.util.Collection;
    +import java.util.Collections;
     import java.util.HashMap;
     import java.util.HashSet;
     import java.util.LinkedList;
     import java.util.List;
     import java.util.Map;
     import java.util.Set;
     
    -//TODO: improve this by maintaining slot -> executors as well for more efficient operations
    +import org.apache.storm.generated.WorkerResources;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
     public class SchedulerAssignmentImpl implements SchedulerAssignment {
    +    private static final Logger LOG = LoggerFactory.getLogger(SchedulerAssignmentImpl.class);
    +
         /**
          * topology-id this assignment is for.
          */
    -    String topologyId;
    +    private final String topologyId;
    +
         /**
          * assignment detail, a mapping from executor to <code>WorkerSlot</code>
          */
    -    Map<ExecutorDetails, WorkerSlot> executorToSlot;
    -    
    -    public SchedulerAssignmentImpl(String topologyId, Map<ExecutorDetails, WorkerSlot> executorToSlots) {
    -        this.topologyId = topologyId;
    -        this.executorToSlot = new HashMap<>(0);
    -        if (executorToSlots != null) {
    -            this.executorToSlot.putAll(executorToSlots);
    +    private final Map<ExecutorDetails, WorkerSlot> executorToSlot = new HashMap<>();
    +    private final Map<WorkerSlot, WorkerResources> resources = new HashMap<>();
    +    private final Map<String, Double> totalSharedOffHeap = new HashMap<>();
    +
    +    public SchedulerAssignmentImpl(String topologyId, Map<ExecutorDetails, WorkerSlot> executorToSlot,
    +            Map<WorkerSlot, WorkerResources> resources, Map<String, Double> totalSharedOffHeap) {
    +        this.topologyId = topologyId;       
    +        if (executorToSlot != null) {
    --- End diff --
    
    I prefer to be very lenient with inputs if something can be null I don't want to force others to never use null.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2113#discussion_r120671482
  
    --- Diff: storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java ---
    @@ -144,8 +148,19 @@ public Void call() throws Exception {
                     File tr = new File(tmproot);
                     try {
                         downloadBaseBlobs(tr);
    +                    if (_assignment.is_set_total_node_shared()) {
    +                        File sharedMemoryDirTmpLocation = new File(tr, "shared_by_topology");
    +                        //We need to create a directory for shared memory to write to (we should not encourage this though)
    +                        Path path = sharedMemoryDirTmpLocation.toPath();
    +                        Files.createDirectories(path);
    +                    }
                         _fsOps.moveDirectoryPreferAtomic(tr, _stormRoot);
    -                    _fsOps.setupStormCodeDir(ConfigUtils.readSupervisorStormConf(_conf, _topologyId), _stormRoot);
    +                    Map<String, Object> topoConf = ConfigUtils.readSupervisorStormConf(_conf, _topologyId);
    +                    _fsOps.setupStormCodeDir(topoConf, _stormRoot);
    +                    if (_assignment.is_has_node_shared_memory()) {
    +                        File sharedMemoryDir = new File(_stormRoot, "shared_by_topology");
    --- End diff --
    
    I can add it to the documentation.  But I didn't spend a lot of time documenting it on purpose.  shared memory off heap is hard to get right and I would rather discourage people from using it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2113#discussion_r120668301
  
    --- Diff: storm-server/src/main/java/org/apache/storm/container/cgroup/CgroupManager.java ---
    @@ -151,22 +161,40 @@ public void reserveResourcesForWorker(String workerId, Map<String, Number> resou
                 }
             }
     
    -        if (totalMem != null) {
    -            MemoryCore memCore = (MemoryCore) workerGroup.getCores().get(SubSystemType.memory);
    -            try {
    -                memCore.setPhysicalUsageLimit(Long.valueOf(totalMem.longValue() * 1024 * 1024));
    -            } catch (IOException e) {
    -                throw new RuntimeException("Cannot set memory.limit_in_bytes! Exception: ", e);
    +        if ((boolean) this.conf.get(DaemonConfig.STORM_CGROUP_MEMORY_ENFORCEMENT_ENABLE)) {
    +            if (totalMem != null) {
    +                int cgroupMem =
    +                    (int)
    +                        (Math.ceil(
    +                            ObjectReader.getDouble(
    +                                this.conf.get(DaemonConfig.STORM_CGROUP_MEMORY_LIMIT_TOLERANCE_MARGIN_MB),
    --- End diff --
    
    Makes sense


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #2113: STORM-2497: Let Supervisor enforce memory and add in supp...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the issue:

    https://github.com/apache/storm/pull/2113
  
    @srdo @kishorvpatil @jerrypeng 
    
    I just rebased this on the latest master and added in a few caches in some of the internal data structures to help with performance issues we had seen in production.  Please take a look again, I really would like to get this in sooner rather then later as we have some other work we want to do on the RAS scheduler to add in generic resource support (we have a goal to officially support GPUs)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2113#discussion_r120456638
  
    --- Diff: storm-server/src/test/java/org/apache/storm/scheduler/resource/TestResourceAwareScheduler.java ---
    @@ -46,53 +44,43 @@
     import org.slf4j.LoggerFactory;
     
     import java.util.HashMap;
    -import java.util.Iterator;
     import java.util.LinkedList;
     import java.util.List;
     import java.util.Map;
    +import java.util.Map.Entry;
     import java.util.Set;
     import java.util.ArrayList;
     import java.util.HashSet;
     import java.util.Collection;
     import java.util.Collections;
     
    -import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.genExecsAndComps;
    +import static org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler.*;
     
     public class TestResourceAwareScheduler {
     
    -    private final String TOPOLOGY_SUBMITTER = "jerry";
    -
         private static final Logger LOG = LoggerFactory.getLogger(TestResourceAwareScheduler.class);
     
         private static int currentTime = 1450418597;
     
    -    private static final Config defaultTopologyConf = new Config();
    +    private static final Config defaultTopologyConf = createClusterConfig(10, 128, 0, null);
     
         @BeforeClass
         public static void initConf() {
    -        defaultTopologyConf.put(Config.STORM_NETWORK_TOPOGRAPHY_PLUGIN, "org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping");
    -        defaultTopologyConf.put(DaemonConfig.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY, org.apache.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy.class.getName());
    -        defaultTopologyConf.put(DaemonConfig.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY, org.apache.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy.class.getName());
    -
    -        defaultTopologyConf.put(Config.TOPOLOGY_SCHEDULER_STRATEGY, org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy.class.getName());
    -        defaultTopologyConf.put(Config.TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT, 10.0);
    -        defaultTopologyConf.put(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB, 128.0);
    -        defaultTopologyConf.put(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB, 0.0);
    +      //TODO clean this up some more
    --- End diff --
    
    Todo


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2113#discussion_r120688356
  
    --- Diff: storm-server/src/test/java/org/apache/storm/daemon/nimbus/NimbusTest.java ---
    @@ -24,8 +24,10 @@
     import org.apache.storm.topology.TopologyBuilder;
     import org.junit.Test;
     
    +import static org.junit.Assert.*;
    --- End diff --
    
    Checkstyle does not check tests for some reason, but if you care about it I can change it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2113#discussion_r120669238
  
    --- Diff: storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java ---
    @@ -144,8 +148,19 @@ public Void call() throws Exception {
                     File tr = new File(tmproot);
                     try {
                         downloadBaseBlobs(tr);
    +                    if (_assignment.is_set_total_node_shared()) {
    +                        File sharedMemoryDirTmpLocation = new File(tr, "shared_by_topology");
    +                        //We need to create a directory for shared memory to write to (we should not encourage this though)
    +                        Path path = sharedMemoryDirTmpLocation.toPath();
    +                        Files.createDirectories(path);
    +                    }
                         _fsOps.moveDirectoryPreferAtomic(tr, _stormRoot);
    -                    _fsOps.setupStormCodeDir(ConfigUtils.readSupervisorStormConf(_conf, _topologyId), _stormRoot);
    +                    Map<String, Object> topoConf = ConfigUtils.readSupervisorStormConf(_conf, _topologyId);
    +                    _fsOps.setupStormCodeDir(topoConf, _stormRoot);
    +                    if (_assignment.is_has_node_shared_memory()) {
    +                        File sharedMemoryDir = new File(_stormRoot, "shared_by_topology");
    --- End diff --
    
    Okay. I was just thinking it might be helpful if the topology could get the directory path (or _stormRoot), so users don't have to figure out how to path to this directory themselves.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2113#discussion_r120458603
  
    --- Diff: storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java ---
    @@ -144,8 +148,19 @@ public Void call() throws Exception {
                     File tr = new File(tmproot);
                     try {
                         downloadBaseBlobs(tr);
    +                    if (_assignment.is_set_total_node_shared()) {
    +                        File sharedMemoryDirTmpLocation = new File(tr, "shared_by_topology");
    +                        //We need to create a directory for shared memory to write to (we should not encourage this though)
    +                        Path path = sharedMemoryDirTmpLocation.toPath();
    +                        Files.createDirectories(path);
    +                    }
                         _fsOps.moveDirectoryPreferAtomic(tr, _stormRoot);
    -                    _fsOps.setupStormCodeDir(ConfigUtils.readSupervisorStormConf(_conf, _topologyId), _stormRoot);
    +                    Map<String, Object> topoConf = ConfigUtils.readSupervisorStormConf(_conf, _topologyId);
    +                    _fsOps.setupStormCodeDir(topoConf, _stormRoot);
    +                    if (_assignment.is_has_node_shared_memory()) {
    +                        File sharedMemoryDir = new File(_stormRoot, "shared_by_topology");
    --- End diff --
    
    I might have missed it somewhere, but how does the topology code access the off heap shared memory?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2113#discussion_r120656010
  
    --- Diff: storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java ---
    @@ -528,28 +577,108 @@ public void cleanUpForRestart() throws IOException {
             deleteSavedWorkerUser();
             _workerId = null;
         }
    -    
    +
    +    /**
    +     * Check if the container is over its memory limit AND needs to be killed. This does not necessarily mean
    +     * that it just went over the limit.
    +     * @throws IOException on any error
    +     */
    +    public boolean isMemoryLimitViolated(LocalAssignment withUpdatedLimits) throws IOException {
    --- End diff --
    
    Yes but it is overridden by other Container Implementations and they may want to throw an exception.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2113#discussion_r120658098
  
    --- Diff: storm-server/src/main/java/org/apache/storm/localizer/AsyncLocalizer.java ---
    @@ -144,8 +148,19 @@ public Void call() throws Exception {
                     File tr = new File(tmproot);
                     try {
                         downloadBaseBlobs(tr);
    +                    if (_assignment.is_set_total_node_shared()) {
    +                        File sharedMemoryDirTmpLocation = new File(tr, "shared_by_topology");
    +                        //We need to create a directory for shared memory to write to (we should not encourage this though)
    +                        Path path = sharedMemoryDirTmpLocation.toPath();
    +                        Files.createDirectories(path);
    +                    }
                         _fsOps.moveDirectoryPreferAtomic(tr, _stormRoot);
    -                    _fsOps.setupStormCodeDir(ConfigUtils.readSupervisorStormConf(_conf, _topologyId), _stormRoot);
    +                    Map<String, Object> topoConf = ConfigUtils.readSupervisorStormConf(_conf, _topologyId);
    +                    _fsOps.setupStormCodeDir(topoConf, _stormRoot);
    +                    if (_assignment.is_has_node_shared_memory()) {
    +                        File sharedMemoryDir = new File(_stormRoot, "shared_by_topology");
    --- End diff --
    
    That is up to the topology.  It could be JNI code or even something that a ShellBolt or ShellSpout does.  The point is to provide a place for them to store the data if they need it.  We have a group doing this in /tmp right now and causing all kinds of issues if things crash/get rescheduled leaking things.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2113#discussion_r120646636
  
    --- Diff: storm-client/src/jvm/org/apache/storm/topology/BaseConfigurationDeclarer.java ---
    @@ -52,18 +52,20 @@ public T setNumTasks(Number val) {
             return addConfiguration(Config.TOPOLOGY_TASKS, val);
         }
     
    +    @SuppressWarnings("unchecked")
         @Override
         public T setMemoryLoad(Number onHeap) {
             if (onHeap != null) {
    --- End diff --
    
    I agree but that is code that came before, and if we want to make that change I would prefer to do it on a separate pull request, just so a breaking change does not sneak in as part of review comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2113#discussion_r120444024
  
    --- Diff: storm-server/src/main/java/org/apache/storm/DaemonConfig.java ---
    @@ -889,10 +890,83 @@
         public static String STORM_CGROUP_CGEXEC_CMD = "storm.cgroup.cgexec.cmd";
     
         /**
    -     * The amount of memory a worker can exceed its allocation before cgroup will kill it
    +     * Please use STORM_SUPERVISOR_MEMORY_LIMIT_TOLERANCE_MARGIN_MB instead. The amount of memory a
    +     * worker can exceed its allocation before cgroup will kill it.
    +     */
    +    @isPositiveNumber(includeZero = true)
    +    public static String STORM_CGROUP_MEMORY_LIMIT_TOLERANCE_MARGIN_MB =
    +        "storm.cgroup.memory.limit.tolerance.margin.mb";
    +
    +    /**
    +     * Java does not always play nicely with cgroups. It is coming but not fully implemented and not
    +     * for the way storm uses cgroups. In the short term you can disable the hard memory enforcement
    +     * by cgroups and let the supervisor handle shooting workers going over their limit in a kinder
    +     * way.
    +     */
    +    @isBoolean
    +    public static String STORM_CGROUP_MEMORY_ENFORCEMENT_ENABLE = "storm.cgroup.memory.enforcement.enable";
    +
    +    // Configs for memory enforcement does by the supervisor (not cgroups directly)
    --- End diff --
    
    nit: does -> done(?)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2113#discussion_r120453941
  
    --- Diff: storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java ---
    @@ -528,28 +577,108 @@ public void cleanUpForRestart() throws IOException {
             deleteSavedWorkerUser();
             _workerId = null;
         }
    -    
    +
    +    /**
    +     * Check if the container is over its memory limit AND needs to be killed. This does not necessarily mean
    +     * that it just went over the limit.
    +     * @throws IOException on any error
    +     */
    +    public boolean isMemoryLimitViolated(LocalAssignment withUpdatedLimits) throws IOException {
    --- End diff --
    
    This doesn't seem like it can throw IOException


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2113#discussion_r120430454
  
    --- Diff: storm-client/src/jvm/org/apache/storm/drpc/LinearDRPCTopologyBuilder.java ---
    @@ -389,5 +397,11 @@ public LinearDRPCInputDeclarer addConfigurations(Map<String, Object> conf) {
                 _component.componentConfs.add(conf);
                 return this;
             }
    +
    +        @Override
    +        public LinearDRPCInputDeclarer addSharedMemory(SharedMemory request) {
    +            _component.sharedMemory.add(request);
    +            return null;
    --- End diff --
    
    Should this return null?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2113#discussion_r120437166
  
    --- Diff: storm-client/src/jvm/org/apache/storm/scheduler/WorkerSlot.java ---
    @@ -20,23 +20,12 @@
     public class WorkerSlot {
         private String nodeId;
    --- End diff --
    
    Nit: Can these be final?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2113#discussion_r120687251
  
    --- Diff: storm-client/src/jvm/org/apache/storm/scheduler/SchedulerAssignment.java ---
    @@ -21,41 +21,52 @@
     import java.util.Map;
     import java.util.Set;
     
    +import org.apache.storm.generated.Assignment;
    +import org.apache.storm.generated.WorkerResources;
    +
     public interface SchedulerAssignment {
         /**
          * Does this slot occupied by this assignment?
          * @param slot
    -     * @return
    +     * @return true if the slot is occupied else false
          */
         public boolean isSlotOccupied(WorkerSlot slot);
     
         /**
    -     * is the executor assigned?
    +     * Is the executor assigned?
          * 
          * @param executor
    -     * @return
    +     * @return true if it is assigned else false
          */
         public boolean isExecutorAssigned(ExecutorDetails executor);
         
         /**
    -     * get the topology-id this assignment is for.
    -     * @return
    +     * Return the ID of the topolgoy.
    --- End diff --
    
    I even typo my fixes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2113#discussion_r120456435
  
    --- Diff: storm-server/src/test/java/org/apache/storm/daemon/nimbus/NimbusTest.java ---
    @@ -0,0 +1,49 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.daemon.nimbus;
    +
    +import org.apache.storm.Config;
    +import org.apache.storm.DaemonConfig;
    +import org.apache.storm.generated.StormTopology;
    +import org.apache.storm.testing.TestWordSpout;
    +import org.apache.storm.topology.TopologyBuilder;
    +import org.junit.Test;
    +
    +public class NimbusTest {
    +    @Test(expected=IllegalArgumentException.class)
    --- End diff --
    
    Nit: Could reduce the scope of this assertion more local with the ExpectedException rule


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2113#discussion_r120446703
  
    --- Diff: storm-server/src/main/java/org/apache/storm/container/cgroup/CgroupManager.java ---
    @@ -151,22 +161,40 @@ public void reserveResourcesForWorker(String workerId, Map<String, Number> resou
                 }
             }
     
    -        if (totalMem != null) {
    -            MemoryCore memCore = (MemoryCore) workerGroup.getCores().get(SubSystemType.memory);
    -            try {
    -                memCore.setPhysicalUsageLimit(Long.valueOf(totalMem.longValue() * 1024 * 1024));
    -            } catch (IOException e) {
    -                throw new RuntimeException("Cannot set memory.limit_in_bytes! Exception: ", e);
    +        if ((boolean) this.conf.get(DaemonConfig.STORM_CGROUP_MEMORY_ENFORCEMENT_ENABLE)) {
    +            if (totalMem != null) {
    +                int cgroupMem =
    +                    (int)
    +                        (Math.ceil(
    +                            ObjectReader.getDouble(
    +                                this.conf.get(DaemonConfig.STORM_CGROUP_MEMORY_LIMIT_TOLERANCE_MARGIN_MB),
    +                                0.0)));
    +                long memLimit = Long.valueOf((totalMem.longValue() + cgroupMem) * 1024 * 1024);
    +                MemoryCore memCore = (MemoryCore) workerGroup.getCores().get(SubSystemType.memory);
    +                try {
    +                    memCore.setPhysicalUsageLimit(memLimit);
    +                } catch (IOException e) {
    +                    throw new RuntimeException("Cannot set memory.limit_in_bytes! Exception: ", e);
    +                }
    +                // need to set memory.memsw.limit_in_bytes after setting memory.limit_in_bytes or error
    +                // might occur
    +                try {
    +                    memCore.setWithSwapUsageLimit(memLimit);
    +                } catch (IOException e) {
    +                    throw new RuntimeException("Cannot set memory.memsw.limit_in_bytes! Exception: ", e);
    +                }
                 }
             }
         }
     
    +    @Override
         public void releaseResourcesForWorker(String workerId) {
             CgroupCommon workerGroup = new CgroupCommon(workerId, hierarchy, this.rootCgroup);
             try {
                 Set<Integer> tasks = workerGroup.getTasks();
                 if (!tasks.isEmpty()) {
    -                throw new Exception("Cannot correctly showdown worker CGroup " + workerId + "tasks " + tasks.toString() + " still running!");
    +                throw new Exception("Cannot correctly showdown worker CGroup " + workerId + "tasks " + tasks
    --- End diff --
    
    nit: showdown -> shutdown


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm issue #2113: STORM-2497: Let Supervisor enforce memory and add in supp...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/2113
  
    +1, though I'll admit to just skimming it this time. It would be easier to tell what changed if the non-merge-conflict-resolution changes were in separate commits.
    
    It might make sense to drop a line to @roshannaik before merging, I think he asked on the mailing list to be made aware of changes to storm-client due to https://github.com/apache/storm/pull/2241


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2113#discussion_r120647251
  
    --- Diff: storm-client/src/jvm/org/apache/storm/topology/BaseConfigurationDeclarer.java ---
    @@ -52,18 +52,20 @@ public T setNumTasks(Number val) {
             return addConfiguration(Config.TOPOLOGY_TASKS, val);
         }
     
    +    @SuppressWarnings("unchecked")
         @Override
         public T setMemoryLoad(Number onHeap) {
             if (onHeap != null) {
    --- End diff --
    
    I filed https://issues.apache.org/jira/browse/STORM-2545 for it


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request #2113: STORM-2497: Let Supervisor enforce memory and add ...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2113#discussion_r120451243
  
    --- Diff: storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java ---
    @@ -728,8 +758,145 @@ protected String javaCmd(String cmd) {
             
             return commandList;
         }
    +    
    +  @Override
    +  public boolean isMemoryLimitViolated(LocalAssignment withUpdatedLimits) throws IOException {
    +    if (super.isMemoryLimitViolated(withUpdatedLimits)) {
    +      return true;
    +    }
    +    if (_resourceIsolationManager != null) {
    +      // In the short term the goal is to not shoot anyone unless we really need to.
    +      // The on heap should limit the memory usage in most cases to a reasonable amount
    +      // If someone is using way more than they requested this is a bug and we should
    +      // not allow it
    +      long usageMb;
    +      long memoryLimitMb;
    +      long hardMemoryLimitOver;
    +      String typeOfCheck;
    +
    +      if (withUpdatedLimits.is_has_node_shared_memory()) {
    +        //We need to do enforcement on a topology level, not a single worker level...
    +        // Because in for cgroups each page in shared memory goes to the worker that touched it
    +        // first. We may need to make this more plugable in the future and let the resource
    +        // isolation manager tell us what to do
    +        usageMb = getTotalTopologyMemoryUsed();
    +        memoryLimitMb = getTotalTopologyMemoryReserved(withUpdatedLimits);
    +        hardMemoryLimitOver = this.hardMemoryLimitOver * getTotalWorkersForThisTopology();
    +        typeOfCheck = "TOPOLOGY " + _topologyId;
    +      } else {
    +        usageMb = getMemoryUsageMb();
    +        memoryLimitMb = this.memoryLimitMB;
    +        hardMemoryLimitOver = this.hardMemoryLimitOver;
    +        typeOfCheck = "WORKER " + _workerId;
    +      }
    +      LOG.debug(
    +          "Enforcing memory usage for {} with usgae of {} out of {} total and a hard limit of {}",
    --- End diff --
    
    usgae -> usage


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---