You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by HeartSaVioR <gi...@git.apache.org> on 2016/07/11 09:39:52 UTC

[GitHub] storm pull request #1359: STORM-1239: port backtype.storm.scheduler.Isolatio...

Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1359#discussion_r70228042
  
    --- Diff: storm-core/src/jvm/org/apache/storm/scheduler/IsolationScheduler.java ---
    @@ -0,0 +1,417 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.scheduler;
    +
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.Comparator;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.TreeMap;
    +
    +import org.apache.commons.lang.Validate;
    +import org.apache.storm.utils.Utils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import org.apache.storm.Config;
    +import org.apache.storm.scheduler.Cluster;
    +import org.apache.storm.scheduler.ExecutorDetails;
    +import org.apache.storm.scheduler.IScheduler;
    +import org.apache.storm.scheduler.SchedulerAssignment;
    +import org.apache.storm.scheduler.Topologies;
    +import org.apache.storm.scheduler.TopologyDetails;
    +import org.apache.storm.scheduler.WorkerSlot;
    +
    +// for each isolated topology:
    +//   compute even distribution of executors -> workers on the number of workers specified for the topology
    +//   compute distribution of workers to machines
    +// determine host -> list of [slot, topology id, executors]
    +// iterate through hosts and: a machine is good if:
    +//   1. only running workers from one isolated topology
    +//   2. all workers running on it match one of the distributions of executors for that topology
    +//   3. matches one of the # of workers
    +// blacklist the good hosts and remove those workers from the list of need to be assigned workers
    +// otherwise unassign all other workers for isolated topologies if assigned
    +public class IsolationScheduler implements IScheduler {
    +    private final static Logger LOG = LoggerFactory.getLogger(IsolationScheduler.class);
    +
    +    private Map<String, Number> isoMachines;
    +
    +    @Override
    +    public void prepare(Map conf) {
    +        this.isoMachines = (Map<String, Number>) conf.get(Config.ISOLATION_SCHEDULER_MACHINES);
    +        Validate.notEmpty(isoMachines);
    +    }
    +
    +    // get host -> all assignable worker slots for non-blacklisted machines (assigned or not assigned)
    +    // will then have a list of machines that need to be assigned (machine -> [topology, list of list of executors])
    +    // match each spec to a machine (who has the right number of workers), free everything else on that machine and assign those slots (do one topology at a time)
    +    // blacklist all machines who had production slots defined
    +    // log isolated topologies who weren't able to get enough slots / machines
    +    // run default scheduler on isolated topologies that didn't have enough slots + non-isolated topologies on remaining machines
    +    // set blacklist to what it was initially
    +    @Override
    +    public void schedule(Topologies topologies, Cluster cluster) {
    +        Set<String> origBlacklist = cluster.getBlacklistedHosts();
    +        List<TopologyDetails> isoTopologies = isolatedTopologies(topologies.getTopologies());
    +        Set<String> isoIds = isolatedTopoplogyIds(isoTopologies);
    +        Map<String, Set<Set<ExecutorDetails>>> topologyWorkerSpecs = topologyWorkerSpecs(isoTopologies);
    +        Map<String, Map<Integer, Integer>> topologyMachineDistributions = topologyMachineDistributions(isoTopologies);
    +        Map<String, List<AssignmentInfo>> hostAssignments = hostAssignments(cluster);
    +
    +        for (Map.Entry<String, List<AssignmentInfo>> entry : hostAssignments.entrySet()) {
    +            List<AssignmentInfo> assignments = entry.getValue();
    +            String topologyId = assignments.get(0).getTopologyId();
    +            Map<Integer, Integer> distribution = topologyMachineDistributions.get(topologyId);
    +            Set<Set<ExecutorDetails>> workerSpecs = topologyWorkerSpecs.get(topologyId);
    +            int numWorkers = assignments.size();
    +
    +            if (isoIds.contains(topologyId)
    +                    && checkAssignmentTopology(assignments, topologyId)
    +                    && distribution.containsKey(numWorkers)
    +                    && checkAssignmentWorkerSpecs(assignments, workerSpecs)) {
    +                decrementDistribution(distribution, numWorkers);
    +                for (AssignmentInfo ass : assignments) {
    +                    workerSpecs.remove(ass.getExecutors());
    +                }
    +                cluster.blacklistHost(entry.getKey());
    +            } else {
    +                for (AssignmentInfo ass : assignments) {
    +                    if (isoIds.contains(ass.getTopologyId())) {
    +                        cluster.freeSlot(ass.getWorkerSlot());
    +                    }
    +                }
    +            }
    +        }
    +
    +        Map<String, Set<WorkerSlot>> hostUsedSlots = hostUsedSlots(cluster);
    +        LinkedList<HostAssignableSlots> hss = hostAssignableSlots(cluster);
    +        List<String> failedTopologyIds = new ArrayList<String>();
    +        for (Map.Entry<String, Set<Set<ExecutorDetails>>> entry : topologyWorkerSpecs.entrySet()) {
    +            String topologyId = entry.getKey();
    +            Set<Set<ExecutorDetails>> executorSet = entry.getValue();
    +            if (executorSet != null && executorSet.size() > 0) {
    +                failedTopologyIds.add(topologyId);
    +            }
    +            List<Integer> workerNum = distributionSortedAmts(topologyMachineDistributions.get(topologyId));
    +            for (Integer num : workerNum) {
    +                HostAssignableSlots hostSlots = hss.peek();
    +                List<WorkerSlot> slot = hostSlots != null ? hostSlots.getWorkerSlots() : null;
    +
    +                if (slot != null && slot.size() >= num.intValue()) {
    +                    hss.poll();
    +                    cluster.freeSlots(hostUsedSlots.get(hostSlots.getHostName()));
    +                    for (WorkerSlot tmpSlot : slot.subList(0, num)) {
    +                        Set<ExecutorDetails> executor = removeElemFromSet(executorSet);
    +                        cluster.assign(tmpSlot, topologyId, executor);
    +                    }
    +                    cluster.blacklistHost(hostSlots.getHostName());
    +                }
    +            }
    +        }
    +
    +        if (failedTopologyIds.size() > 0) {
    +            LOG.warn("Unable to isolate topologies " + failedTopologyIds
    +                    + ". No machine had enough worker slots to run the remaining workers for these topologies. "
    +                    + "Clearing all other resources and will wait for enough resources for "
    +                    + "isolated topologies before allocating any other resources.");
    +            // clear workers off all hosts that are not blacklisted
    +            Map<String, Set<WorkerSlot>> usedSlots = hostUsedSlots(cluster);
    +            Set<Map.Entry<String, Set<WorkerSlot>>> entries = usedSlots.entrySet();
    +            for (Map.Entry<String, Set<WorkerSlot>> entry : entries) {
    +                if (!cluster.isBlacklistedHost(entry.getKey())) {
    +                    cluster.freeSlots(entry.getValue());
    +                }
    +            }
    +        } else {
    +            // run default scheduler on non-isolated topologies
    +            Set<String> allocatedTopologies = allocatedTopologies(topologyWorkerSpecs);
    +            Topologies leftOverTopologies = leftoverTopologies(topologies, allocatedTopologies);
    +            DefaultScheduler.defaultSchedule(leftOverTopologies, cluster);
    +        }
    +        cluster.setBlacklistedHosts(origBlacklist);
    +    }
    +
    +    public Set<ExecutorDetails> removeElemFromSet(Set<Set<ExecutorDetails>> executorsSets) {
    +        Set<ExecutorDetails> elem = executorsSets.iterator().next();
    +        executorsSets.remove(elem);
    +        return elem;
    +    }
    +
    +    private List<TopologyDetails> isolatedTopologies(Collection<TopologyDetails> topologies) {
    +        Set<String> topologyNames = isoMachines.keySet();
    +        List<TopologyDetails> isoTopologies = new ArrayList<TopologyDetails>();
    +        for (TopologyDetails topo : topologies) {
    +            if (topologyNames.contains(topo.getName())) {
    +                isoTopologies.add(topo);
    +            }
    +        }
    +        return isoTopologies;
    +    }
    +
    +    private Set<String> isolatedTopoplogyIds(List<TopologyDetails> topologies) {
    +        Set<String> ids = new HashSet<String>();
    +        if (topologies != null && topologies.size() > 0) {
    +            for (TopologyDetails topology : topologies) {
    +                ids.add(topology.getId());
    +            }
    +        }
    +        return ids;
    +    }
    +
    +    // map from topology id -> set of sets of executors
    +    private Map<String, Set<Set<ExecutorDetails>>> topologyWorkerSpecs(List<TopologyDetails> topologies) {
    +        Map<String, Set<Set<ExecutorDetails>>> workerSpecs = new HashMap<String, Set<Set<ExecutorDetails>>>();
    +        for (TopologyDetails topology : topologies) {
    +            workerSpecs.put(topology.getId(), computeWorkerSpecs(topology));
    +        }
    +        return workerSpecs;
    +    }
    +
    +    private Map<String, List<AssignmentInfo>> hostAssignments(Cluster cluster) {
    +        Collection<SchedulerAssignment> assignmentValues =  cluster.getAssignments().values();
    +        Map<String, List<AssignmentInfo>> hostAssignments = new HashMap<String, List<AssignmentInfo>>();
    +
    +        for (SchedulerAssignment sa : assignmentValues) {
    +            Map<WorkerSlot, List<ExecutorDetails>> slotExecutors = Utils.reverseMap(sa.getExecutorToSlot());
    +            Set<Map.Entry<WorkerSlot, List<ExecutorDetails>>> entries = slotExecutors.entrySet();
    +            for (Map.Entry<WorkerSlot, List<ExecutorDetails>> entry : entries) {
    +                WorkerSlot slot = entry.getKey();
    +                List<ExecutorDetails> executors = entry.getValue();
    +
    +                String host = cluster.getHost(slot.getNodeId());
    +                AssignmentInfo ass = new AssignmentInfo(slot, sa.getTopologyId(), new HashSet<ExecutorDetails>(executors));
    +                List<AssignmentInfo> executorList = hostAssignments.get(host);
    +                if (executorList == null) {
    +                    executorList = new ArrayList<AssignmentInfo>();
    +                    hostAssignments.put(host, executorList);
    +                }
    +                executorList.add(ass);
    +            }
    +        }
    +        return hostAssignments;
    +    }
    +
    +    private Set<Set<ExecutorDetails>> computeWorkerSpecs(TopologyDetails topology) {
    --- End diff --
    
    It might be better to keep the value name to `details`, or use full name `topologyDetails`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---