You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by Ethanlm <gi...@git.apache.org> on 2018/04/05 14:27:54 UTC

[GitHub] storm pull request #2623: [STORM-2687] Group Topology executors by network p...

GitHub user Ethanlm opened a pull request:

    https://github.com/apache/storm/pull/2623

    [STORM-2687] Group Topology executors by network proximity needs and schedule them on network wise close slots

    https://issues.apache.org/jira/browse/STORM-2687
    
    This tries to schedule topology upstream and downstream executors closely.
    
    Doing performance testing on this. 

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/Ethanlm/storm STORM-2687

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/2623.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2623
    
----
commit 04b9aa289713e5b3ce67edb8526976959a12a41c
Author: Ethan Li <et...@...>
Date:   2018-02-20T22:04:02Z

    [STORM-2687] Group Topology executors by network proximity needs and schedule them on network wise close slots

----


---

[GitHub] storm pull request #2623: [STORM-2687] Group Topology executors by network p...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2623#discussion_r179569132
  
    --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java ---
    @@ -477,45 +414,136 @@ protected String nodeToRack(RAS_Node node) {
             List<ExecutorDetails> execsScheduled = new LinkedList<>();
     
             Map<String, Queue<ExecutorDetails>> compToExecsToSchedule = new HashMap<>();
    -        for (Component component : componentMap.values()) {
    -            compToExecsToSchedule.put(component.getId(), new LinkedList<ExecutorDetails>());
    +        for (Map.Entry<String, Component> componentEntry: componentMap.entrySet()) {
    +            Component component = componentEntry.getValue();
    +            compToExecsToSchedule.put(component.getId(), new LinkedList<>());
                 for (ExecutorDetails exec : component.getExecs()) {
                     if (unassignedExecutors.contains(exec)) {
                         compToExecsToSchedule.get(component.getId()).add(exec);
    +                    LOG.info("{} has unscheduled executor {}", component.getId(), exec);
    --- End diff --
    
    Could we remove this please?  Not sure it is needed anymore.


---

[GitHub] storm pull request #2623: [STORM-2687] Group Topology executors by network p...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2623#discussion_r179570190
  
    --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java ---
    @@ -477,45 +414,136 @@ protected String nodeToRack(RAS_Node node) {
             List<ExecutorDetails> execsScheduled = new LinkedList<>();
     
             Map<String, Queue<ExecutorDetails>> compToExecsToSchedule = new HashMap<>();
    -        for (Component component : componentMap.values()) {
    -            compToExecsToSchedule.put(component.getId(), new LinkedList<ExecutorDetails>());
    +        for (Map.Entry<String, Component> componentEntry: componentMap.entrySet()) {
    +            Component component = componentEntry.getValue();
    +            compToExecsToSchedule.put(component.getId(), new LinkedList<>());
                 for (ExecutorDetails exec : component.getExecs()) {
                     if (unassignedExecutors.contains(exec)) {
                         compToExecsToSchedule.get(component.getId()).add(exec);
    +                    LOG.info("{} has unscheduled executor {}", component.getId(), exec);
                     }
                 }
             }
     
    -        Set<Component> sortedComponents = sortComponents(componentMap);
    -        sortedComponents.addAll(componentMap.values());
    +        List<Component> sortedComponents = topologicalSortComponents(componentMap);
     
    -        for (Component currComp : sortedComponents) {
    -            Map<String, Component> neighbors = new HashMap<String, Component>();
    -            for (String compId : Sets.union(currComp.getChildren(), currComp.getParents())) {
    -                neighbors.put(compId, componentMap.get(compId));
    +        for (Component currComp: sortedComponents) {
    +            int numExecs = compToExecsToSchedule.get(currComp.getId()).size();
    +            for (int i = 0; i < numExecs; i++) {
    +                execsScheduled.addAll(takeExecutors(currComp, numExecs - i, componentMap, compToExecsToSchedule));
                 }
    -            Set<Component> sortedNeighbors = sortNeighbors(currComp, neighbors);
    -            Queue<ExecutorDetails> currCompExesToSched = compToExecsToSchedule.get(currComp.getId());
    -
    -            boolean flag = false;
    -            do {
    -                flag = false;
    -                if (!currCompExesToSched.isEmpty()) {
    -                    execsScheduled.add(currCompExesToSched.poll());
    -                    flag = true;
    -                }
    +        }
    +
    +        LOG.info("The ordering result is {}", execsScheduled);
    +
    +        return execsScheduled;
    +    }
     
    -                for (Component neighborComp : sortedNeighbors) {
    -                    Queue<ExecutorDetails> neighborCompExesToSched =
    -                        compToExecsToSchedule.get(neighborComp.getId());
    -                    if (!neighborCompExesToSched.isEmpty()) {
    -                        execsScheduled.add(neighborCompExesToSched.poll());
    -                        flag = true;
    +    private List<ExecutorDetails> takeExecutors(Component currComp, int numExecs,
    +                                                final Map<String, Component> componentMap,
    +                                                final Map<String, Queue<ExecutorDetails>> compToExecsToSchedule) {
    +        List<ExecutorDetails> execsScheduled = new ArrayList<>();
    +        Queue<ExecutorDetails> currQueue = compToExecsToSchedule.get((currComp.getId()));
    --- End diff --
    
    nit: there is an extra unneeded pair of '(' and ')' 


---

[GitHub] storm pull request #2623: [STORM-2687] Group Topology executors by network p...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2623#discussion_r179568920
  
    --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java ---
    @@ -477,45 +414,136 @@ protected String nodeToRack(RAS_Node node) {
             List<ExecutorDetails> execsScheduled = new LinkedList<>();
     
             Map<String, Queue<ExecutorDetails>> compToExecsToSchedule = new HashMap<>();
    -        for (Component component : componentMap.values()) {
    -            compToExecsToSchedule.put(component.getId(), new LinkedList<ExecutorDetails>());
    +        for (Map.Entry<String, Component> componentEntry: componentMap.entrySet()) {
    --- End diff --
    
    Nit we only use the Component out of this and never the key, could we go back to just looping through the values like before?


---

[GitHub] storm pull request #2623: [STORM-2687] Group Topology executors by network p...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2623#discussion_r179569824
  
    --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java ---
    @@ -477,45 +414,136 @@ protected String nodeToRack(RAS_Node node) {
             List<ExecutorDetails> execsScheduled = new LinkedList<>();
     
             Map<String, Queue<ExecutorDetails>> compToExecsToSchedule = new HashMap<>();
    -        for (Component component : componentMap.values()) {
    -            compToExecsToSchedule.put(component.getId(), new LinkedList<ExecutorDetails>());
    +        for (Map.Entry<String, Component> componentEntry: componentMap.entrySet()) {
    +            Component component = componentEntry.getValue();
    +            compToExecsToSchedule.put(component.getId(), new LinkedList<>());
                 for (ExecutorDetails exec : component.getExecs()) {
                     if (unassignedExecutors.contains(exec)) {
                         compToExecsToSchedule.get(component.getId()).add(exec);
    +                    LOG.info("{} has unscheduled executor {}", component.getId(), exec);
                     }
                 }
             }
     
    -        Set<Component> sortedComponents = sortComponents(componentMap);
    -        sortedComponents.addAll(componentMap.values());
    +        List<Component> sortedComponents = topologicalSortComponents(componentMap);
     
    -        for (Component currComp : sortedComponents) {
    -            Map<String, Component> neighbors = new HashMap<String, Component>();
    -            for (String compId : Sets.union(currComp.getChildren(), currComp.getParents())) {
    -                neighbors.put(compId, componentMap.get(compId));
    +        for (Component currComp: sortedComponents) {
    +            int numExecs = compToExecsToSchedule.get(currComp.getId()).size();
    +            for (int i = 0; i < numExecs; i++) {
    +                execsScheduled.addAll(takeExecutors(currComp, numExecs - i, componentMap, compToExecsToSchedule));
                 }
    -            Set<Component> sortedNeighbors = sortNeighbors(currComp, neighbors);
    -            Queue<ExecutorDetails> currCompExesToSched = compToExecsToSchedule.get(currComp.getId());
    -
    -            boolean flag = false;
    -            do {
    -                flag = false;
    -                if (!currCompExesToSched.isEmpty()) {
    -                    execsScheduled.add(currCompExesToSched.poll());
    -                    flag = true;
    -                }
    +        }
    +
    +        LOG.info("The ordering result is {}", execsScheduled);
    +
    +        return execsScheduled;
    +    }
     
    -                for (Component neighborComp : sortedNeighbors) {
    -                    Queue<ExecutorDetails> neighborCompExesToSched =
    -                        compToExecsToSchedule.get(neighborComp.getId());
    -                    if (!neighborCompExesToSched.isEmpty()) {
    -                        execsScheduled.add(neighborCompExesToSched.poll());
    -                        flag = true;
    +    private List<ExecutorDetails> takeExecutors(Component currComp, int numExecs,
    --- End diff --
    
    Could you add some kind of javadoc to this explaining what it is trying to do?  It is not that obvious from just the code alone.


---

[GitHub] storm pull request #2623: [STORM-2687] Group Topology executors by network p...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2623#discussion_r179569508
  
    --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java ---
    @@ -477,45 +414,136 @@ protected String nodeToRack(RAS_Node node) {
             List<ExecutorDetails> execsScheduled = new LinkedList<>();
     
             Map<String, Queue<ExecutorDetails>> compToExecsToSchedule = new HashMap<>();
    -        for (Component component : componentMap.values()) {
    -            compToExecsToSchedule.put(component.getId(), new LinkedList<ExecutorDetails>());
    +        for (Map.Entry<String, Component> componentEntry: componentMap.entrySet()) {
    +            Component component = componentEntry.getValue();
    +            compToExecsToSchedule.put(component.getId(), new LinkedList<>());
                 for (ExecutorDetails exec : component.getExecs()) {
                     if (unassignedExecutors.contains(exec)) {
                         compToExecsToSchedule.get(component.getId()).add(exec);
    +                    LOG.info("{} has unscheduled executor {}", component.getId(), exec);
                     }
                 }
             }
     
    -        Set<Component> sortedComponents = sortComponents(componentMap);
    -        sortedComponents.addAll(componentMap.values());
    +        List<Component> sortedComponents = topologicalSortComponents(componentMap);
     
    -        for (Component currComp : sortedComponents) {
    -            Map<String, Component> neighbors = new HashMap<String, Component>();
    -            for (String compId : Sets.union(currComp.getChildren(), currComp.getParents())) {
    -                neighbors.put(compId, componentMap.get(compId));
    +        for (Component currComp: sortedComponents) {
    +            int numExecs = compToExecsToSchedule.get(currComp.getId()).size();
    +            for (int i = 0; i < numExecs; i++) {
    +                execsScheduled.addAll(takeExecutors(currComp, numExecs - i, componentMap, compToExecsToSchedule));
                 }
    -            Set<Component> sortedNeighbors = sortNeighbors(currComp, neighbors);
    -            Queue<ExecutorDetails> currCompExesToSched = compToExecsToSchedule.get(currComp.getId());
    -
    -            boolean flag = false;
    -            do {
    -                flag = false;
    -                if (!currCompExesToSched.isEmpty()) {
    -                    execsScheduled.add(currCompExesToSched.poll());
    -                    flag = true;
    -                }
    +        }
    +
    +        LOG.info("The ordering result is {}", execsScheduled);
    --- End diff --
    
    Could we remove this too?


---

[GitHub] storm pull request #2623: [STORM-2687] Group Topology executors by network p...

Posted by Ethanlm <gi...@git.apache.org>.
Github user Ethanlm commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2623#discussion_r179772853
  
    --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java ---
    @@ -477,45 +414,136 @@ protected String nodeToRack(RAS_Node node) {
             List<ExecutorDetails> execsScheduled = new LinkedList<>();
     
             Map<String, Queue<ExecutorDetails>> compToExecsToSchedule = new HashMap<>();
    -        for (Component component : componentMap.values()) {
    -            compToExecsToSchedule.put(component.getId(), new LinkedList<ExecutorDetails>());
    +        for (Map.Entry<String, Component> componentEntry: componentMap.entrySet()) {
    +            Component component = componentEntry.getValue();
    +            compToExecsToSchedule.put(component.getId(), new LinkedList<>());
                 for (ExecutorDetails exec : component.getExecs()) {
                     if (unassignedExecutors.contains(exec)) {
                         compToExecsToSchedule.get(component.getId()).add(exec);
    +                    LOG.info("{} has unscheduled executor {}", component.getId(), exec);
    --- End diff --
    
    Will remove it before merging. Wanted to keep this for debugging for now.


---

[GitHub] storm issue #2623: [STORM-2687] Group Topology executors by network proximit...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/2623
  
    @Ethanlm Do you have any updates?


---

[GitHub] storm pull request #2623: [STORM-2687] Group Topology executors by network p...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2623#discussion_r179570567
  
    --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java ---
    @@ -477,45 +414,136 @@ protected String nodeToRack(RAS_Node node) {
             List<ExecutorDetails> execsScheduled = new LinkedList<>();
     
             Map<String, Queue<ExecutorDetails>> compToExecsToSchedule = new HashMap<>();
    -        for (Component component : componentMap.values()) {
    -            compToExecsToSchedule.put(component.getId(), new LinkedList<ExecutorDetails>());
    +        for (Map.Entry<String, Component> componentEntry: componentMap.entrySet()) {
    +            Component component = componentEntry.getValue();
    +            compToExecsToSchedule.put(component.getId(), new LinkedList<>());
                 for (ExecutorDetails exec : component.getExecs()) {
                     if (unassignedExecutors.contains(exec)) {
                         compToExecsToSchedule.get(component.getId()).add(exec);
    +                    LOG.info("{} has unscheduled executor {}", component.getId(), exec);
                     }
                 }
             }
     
    -        Set<Component> sortedComponents = sortComponents(componentMap);
    -        sortedComponents.addAll(componentMap.values());
    +        List<Component> sortedComponents = topologicalSortComponents(componentMap);
     
    -        for (Component currComp : sortedComponents) {
    -            Map<String, Component> neighbors = new HashMap<String, Component>();
    -            for (String compId : Sets.union(currComp.getChildren(), currComp.getParents())) {
    -                neighbors.put(compId, componentMap.get(compId));
    +        for (Component currComp: sortedComponents) {
    +            int numExecs = compToExecsToSchedule.get(currComp.getId()).size();
    +            for (int i = 0; i < numExecs; i++) {
    +                execsScheduled.addAll(takeExecutors(currComp, numExecs - i, componentMap, compToExecsToSchedule));
                 }
    -            Set<Component> sortedNeighbors = sortNeighbors(currComp, neighbors);
    -            Queue<ExecutorDetails> currCompExesToSched = compToExecsToSchedule.get(currComp.getId());
    -
    -            boolean flag = false;
    -            do {
    -                flag = false;
    -                if (!currCompExesToSched.isEmpty()) {
    -                    execsScheduled.add(currCompExesToSched.poll());
    -                    flag = true;
    -                }
    +        }
    +
    +        LOG.info("The ordering result is {}", execsScheduled);
    +
    +        return execsScheduled;
    +    }
     
    -                for (Component neighborComp : sortedNeighbors) {
    -                    Queue<ExecutorDetails> neighborCompExesToSched =
    -                        compToExecsToSchedule.get(neighborComp.getId());
    -                    if (!neighborCompExesToSched.isEmpty()) {
    -                        execsScheduled.add(neighborCompExesToSched.poll());
    -                        flag = true;
    +    private List<ExecutorDetails> takeExecutors(Component currComp, int numExecs,
    +                                                final Map<String, Component> componentMap,
    +                                                final Map<String, Queue<ExecutorDetails>> compToExecsToSchedule) {
    +        List<ExecutorDetails> execsScheduled = new ArrayList<>();
    +        Queue<ExecutorDetails> currQueue = compToExecsToSchedule.get((currComp.getId()));
    +        Set<String> sortedChildren = getSortedChildren(currComp, componentMap);
    +
    +        execsScheduled.add(currQueue.poll());
    --- End diff --
    
    Can `currQueue.poll()` ever return null?  How do we handle that if it does?


---

[GitHub] storm pull request #2623: [STORM-2687] Group Topology executors by network p...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2623#discussion_r179571966
  
    --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java ---
    @@ -477,45 +414,136 @@ protected String nodeToRack(RAS_Node node) {
             List<ExecutorDetails> execsScheduled = new LinkedList<>();
     
             Map<String, Queue<ExecutorDetails>> compToExecsToSchedule = new HashMap<>();
    -        for (Component component : componentMap.values()) {
    -            compToExecsToSchedule.put(component.getId(), new LinkedList<ExecutorDetails>());
    +        for (Map.Entry<String, Component> componentEntry: componentMap.entrySet()) {
    +            Component component = componentEntry.getValue();
    +            compToExecsToSchedule.put(component.getId(), new LinkedList<>());
                 for (ExecutorDetails exec : component.getExecs()) {
                     if (unassignedExecutors.contains(exec)) {
                         compToExecsToSchedule.get(component.getId()).add(exec);
    +                    LOG.info("{} has unscheduled executor {}", component.getId(), exec);
                     }
                 }
             }
     
    -        Set<Component> sortedComponents = sortComponents(componentMap);
    -        sortedComponents.addAll(componentMap.values());
    +        List<Component> sortedComponents = topologicalSortComponents(componentMap);
     
    -        for (Component currComp : sortedComponents) {
    -            Map<String, Component> neighbors = new HashMap<String, Component>();
    -            for (String compId : Sets.union(currComp.getChildren(), currComp.getParents())) {
    -                neighbors.put(compId, componentMap.get(compId));
    +        for (Component currComp: sortedComponents) {
    +            int numExecs = compToExecsToSchedule.get(currComp.getId()).size();
    +            for (int i = 0; i < numExecs; i++) {
    +                execsScheduled.addAll(takeExecutors(currComp, numExecs - i, componentMap, compToExecsToSchedule));
                 }
    -            Set<Component> sortedNeighbors = sortNeighbors(currComp, neighbors);
    -            Queue<ExecutorDetails> currCompExesToSched = compToExecsToSchedule.get(currComp.getId());
    -
    -            boolean flag = false;
    -            do {
    -                flag = false;
    -                if (!currCompExesToSched.isEmpty()) {
    -                    execsScheduled.add(currCompExesToSched.poll());
    -                    flag = true;
    -                }
    +        }
    +
    +        LOG.info("The ordering result is {}", execsScheduled);
    +
    +        return execsScheduled;
    +    }
     
    -                for (Component neighborComp : sortedNeighbors) {
    -                    Queue<ExecutorDetails> neighborCompExesToSched =
    -                        compToExecsToSchedule.get(neighborComp.getId());
    -                    if (!neighborCompExesToSched.isEmpty()) {
    -                        execsScheduled.add(neighborCompExesToSched.poll());
    -                        flag = true;
    +    private List<ExecutorDetails> takeExecutors(Component currComp, int numExecs,
    +                                                final Map<String, Component> componentMap,
    +                                                final Map<String, Queue<ExecutorDetails>> compToExecsToSchedule) {
    +        List<ExecutorDetails> execsScheduled = new ArrayList<>();
    +        Queue<ExecutorDetails> currQueue = compToExecsToSchedule.get((currComp.getId()));
    +        Set<String> sortedChildren = getSortedChildren(currComp, componentMap);
    +
    +        execsScheduled.add(currQueue.poll());
    +
    +        for (String childId: sortedChildren) {
    +            Component childComponent = componentMap.get(childId);
    +            Queue<ExecutorDetails> childQueue = compToExecsToSchedule.get(childId);
    +            int childNumExecs = childQueue.size();
    +            if (childNumExecs == 0) {
    +                continue;
    +            }
    +            int numExecsToTake = 1;
    +            if (isShuffleFromParentToChild(currComp, childComponent)) {
    +                // if it's shuffle grouping, truncate
    +                numExecsToTake = Math.max(1, childNumExecs / numExecs);
    +            } // otherwise, one-by-one
    +
    +            for (int i = 0; i < numExecsToTake; i++) {
    +                execsScheduled.addAll(takeExecutors(childComponent, childNumExecs, componentMap, compToExecsToSchedule));
    +            }
    +        }
    +
    +        return execsScheduled;
    +    }
    +
    +    private Set<String> getSortedChildren(Component component, final Map<String, Component> componentMap) {
    +        Set<String> children = component.getChildren();
    +        Set<String> sortedChildren =
    +                new TreeSet<String>((o1, o2) -> {
    +                    Component child1 = componentMap.get(o1);
    +                    Component child2 = componentMap.get(o2);
    +                    boolean child1IsShuffle = isShuffleFromParentToChild(component, child1);
    +                    boolean child2IsShuffle = isShuffleFromParentToChild(component, child2);
    +
    +                    if (child1IsShuffle && child2IsShuffle) {
    +                        return o1.compareTo(o2);
    +                    } else if (child1IsShuffle) {
    +                        return 1;
    +                    } else {
    +                        return -1;
    +                    }
    +                });
    +        sortedChildren.addAll(children);
    +        return sortedChildren;
    +    }
    +
    +    private boolean isShuffleFromParentToChild(Component parent, Component child) {
    --- End diff --
    
    Nit: could we rename this from `isShuffleFromParentToChild` to something more like `hasLocalityAwareGroupingFromParentToChild`?  I know it is longer, but in the future we may want to offer a way to expand this to more than just shuffle.


---