You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2016/12/25 10:32:52 UTC

[1/3] storm git commit: port IsolationScheduler to java

Repository: storm
Updated Branches:
  refs/heads/master d5acec9e3 -> 273fc4b74


port IsolationScheduler to java


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/fca8ebb4
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/fca8ebb4
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/fca8ebb4

Branch: refs/heads/master
Commit: fca8ebb404127c6b07d01832563a21ccb3814238
Parents: f118060
Author: Xin Wang <be...@163.com>
Authored: Mon Apr 4 06:51:01 2016 +0800
Committer: Xin Wang <be...@163.com>
Committed: Sat Dec 24 19:15:17 2016 +0800

----------------------------------------------------------------------
 .../storm/scheduler/IsolationScheduler.clj      | 232 ----------
 .../apache/storm/scheduler/EvenScheduler.java   |   9 +-
 .../storm/scheduler/IsolationScheduler.java     | 425 +++++++++++++++++++
 .../jvm/org/apache/storm/utils/Container.java   |  33 --
 4 files changed, 431 insertions(+), 268 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/fca8ebb4/storm-core/src/clj/org/apache/storm/scheduler/IsolationScheduler.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/scheduler/IsolationScheduler.clj b/storm-core/src/clj/org/apache/storm/scheduler/IsolationScheduler.clj
deleted file mode 100644
index 0446f22..0000000
--- a/storm-core/src/clj/org/apache/storm/scheduler/IsolationScheduler.clj
+++ /dev/null
@@ -1,232 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns org.apache.storm.scheduler.IsolationScheduler
-  (:use [org.apache.storm util config log])
-  (:import [org.apache.storm.scheduler DefaultScheduler])
-  (:import [java.util HashSet Set List LinkedList ArrayList Map HashMap]
-           [org.apache.storm.utils])
-  (:import [org.apache.storm.utils Utils Container])
-  (:import [org.apache.storm.scheduler IScheduler Topologies
-            Cluster TopologyDetails WorkerSlot SchedulerAssignment
-            EvenScheduler ExecutorDetails]
-           [org.apache.storm.utils Utils])
-  (:gen-class
-    :init init
-    :constructors {[] []}
-    :state state 
-    :implements [org.apache.storm.scheduler.IScheduler]))
-
-(defn -init []
-  [[] (Container.)])
-
-(defn -prepare [this conf]
-  (.. this state (set conf)))
-
-(defn- repeat-seq
-  ([aseq]
-    (apply concat (repeat aseq)))
-  ([amt aseq]
-    (apply concat (repeat amt aseq))))
-
-;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
-(defn- compute-worker-specs "Returns mutable set of sets of executors"
-  [^TopologyDetails details]
-  (->> (.getExecutorToComponent details)
-       (Utils/reverseMap)
-       clojurify-structure
-       (map second)
-       (apply concat)
-       (map vector (repeat-seq (range (.getNumWorkers details))))
-       (group-by first)
-       (map-val #(map second %))
-       vals
-       (map set)
-       (HashSet.)
-       ))
-
-(defn isolated-topologies [conf topologies]
-  (let [tset (-> conf (get ISOLATION-SCHEDULER-MACHINES) keys set)]
-    (filter (fn [^TopologyDetails t] (contains? tset (.getName t))) topologies)
-    ))
-
-;; map from topology id -> set of sets of executors
-(defn topology-worker-specs [iso-topologies]
-  (->> iso-topologies
-       (map (fn [t] {(.getId t) (compute-worker-specs t)}))
-       (apply merge)))
-
-(defn machine-distribution [conf ^TopologyDetails topology]
-  (let [name->machines (get conf ISOLATION-SCHEDULER-MACHINES)
-        machines (get name->machines (.getName topology))
-        workers (.getNumWorkers topology)]
-    (-> (Utils/integerDivided workers machines)
-        clojurify-structure
-        (dissoc 0)
-        (HashMap.)
-        )))
-
-(defn topology-machine-distribution [conf iso-topologies]
-  (->> iso-topologies
-       (map (fn [t] {(.getId t) (machine-distribution conf t)}))
-       (apply merge)))
-
-(defn host-assignments [^Cluster cluster]
-  (letfn [(to-slot-specs [^SchedulerAssignment ass]
-            (->> ass
-                 .getExecutorToSlot
-                 (Utils/reverseMap)
-                 clojurify-structure
-                 (map (fn [[slot executors]]
-                        [slot (.getTopologyId ass) (set executors)]))))]
-  (->> cluster
-       .getAssignments
-       vals
-       (mapcat to-slot-specs)
-       (group-by (fn [[^WorkerSlot slot & _]] (.getHost cluster (.getNodeId slot))))
-       )))
-
-(defn- decrement-distribution! [^Map distribution value]
-  (let [v (-> distribution (get value) dec)]
-    (if (zero? v)
-      (.remove distribution value)
-      (.put distribution value v))))
-
-;; returns list of list of slots, reverse sorted by number of slots
-(defn- host-assignable-slots [^Cluster cluster]
-  (-<> cluster
-       .getAssignableSlots
-       (group-by #(.getHost cluster (.getNodeId ^WorkerSlot %)) <>)
-       (dissoc <> nil)
-       (sort-by #(-> % second count -) <>)
-       shuffle
-       (LinkedList. <>)
-       ))
-
-(defn- host->used-slots [^Cluster cluster]
-  (->> cluster
-       .getUsedSlots
-       (group-by #(.getHost cluster (.getNodeId ^WorkerSlot %)))
-       ))
-
-(defn- distribution->sorted-amts [distribution]
-  (->> distribution
-       (mapcat (fn [[val amt]] (repeat amt val)))
-       (sort-by -)
-       ))
-
-(defn- allocated-topologies [topology-worker-specs]
-  (->> topology-worker-specs
-    (filter (fn [[_ worker-specs]] (empty? worker-specs)))
-    (map first)
-    set
-    ))
-
-(defn- leftover-topologies [^Topologies topologies filter-ids-set]
-  (->> topologies
-       .getTopologies
-       (filter (fn [^TopologyDetails t] (not (contains? filter-ids-set (.getId t)))))
-       (map (fn [^TopologyDetails t] {(.getId t) t}))
-       (apply merge)
-       (Topologies.)
-       ))
-
-;; for each isolated topology:
-;;   compute even distribution of executors -> workers on the number of workers specified for the topology
-;;   compute distribution of workers to machines
-;; determine host -> list of [slot, topology id, executors]
-;; iterate through hosts and: a machine is good if:
-;;   1. only running workers from one isolated topology
-;;   2. all workers running on it match one of the distributions of executors for that topology
-;;   3. matches one of the # of workers
-;; blacklist the good hosts and remove those workers from the list of need to be assigned workers
-;; otherwise unassign all other workers for isolated topologies if assigned
-
-(defn remove-elem-from-set! [^Set aset]
-  (let [elem (-> aset .iterator .next)]
-    (.remove aset elem)
-    elem
-    ))
-
-;; get host -> all assignable worker slots for non-blacklisted machines (assigned or not assigned)
-;; will then have a list of machines that need to be assigned (machine -> [topology, list of list of executors])
-;; match each spec to a machine (who has the right number of workers), free everything else on that machine and assign those slots (do one topology at a time)
-;; blacklist all machines who had production slots defined
-;; log isolated topologies who weren't able to get enough slots / machines
-;; run default scheduler on isolated topologies that didn't have enough slots + non-isolated topologies on remaining machines
-;; set blacklist to what it was initially
-(defn -schedule [this ^Topologies topologies ^Cluster cluster]
-  (let [conf (.. this state (get))
-        orig-blacklist (HashSet. (.getBlacklistedHosts cluster))
-        iso-topologies (isolated-topologies conf (.getTopologies topologies))
-        iso-ids-set (->> iso-topologies (map #(.getId ^TopologyDetails %)) set)
-        topology-worker-specs (topology-worker-specs iso-topologies)
-        topology-machine-distribution (topology-machine-distribution conf iso-topologies)
-        host-assignments (host-assignments cluster)]
-    (doseq [[host assignments] host-assignments]
-      (let [top-id (-> assignments first second)
-            distribution (get topology-machine-distribution top-id)
-            ^Set worker-specs (get topology-worker-specs top-id)
-            num-workers (count assignments)
-            ]
-        (if (and (contains? iso-ids-set top-id)
-                 (every? #(= (second %) top-id) assignments)
-                 (contains? distribution num-workers)
-                 (every? #(contains? worker-specs (nth % 2)) assignments))
-          (do (decrement-distribution! distribution num-workers)
-              (doseq [[_ _ executors] assignments] (.remove worker-specs executors))
-              (.blacklistHost cluster host))
-          (doseq [[slot top-id _] assignments]
-            (when (contains? iso-ids-set top-id)
-              (.freeSlot cluster slot)
-              ))
-          )))
-    
-    (let [host->used-slots (host->used-slots cluster)
-          ^LinkedList sorted-assignable-hosts (host-assignable-slots cluster)]
-      ;; TODO: can improve things further by ordering topologies in terms of who needs the least workers
-      (doseq [[top-id worker-specs] topology-worker-specs
-              :let [amts (distribution->sorted-amts (get topology-machine-distribution top-id))]]
-        (doseq [amt amts
-                :let [[host host-slots] (.peek sorted-assignable-hosts)]]
-          (when (and host-slots (>= (count host-slots) amt))
-            (.poll sorted-assignable-hosts)
-            (.freeSlots cluster (get host->used-slots host))
-            (doseq [slot (take amt host-slots)
-                    :let [executors-set (remove-elem-from-set! worker-specs)]]
-              (.assign cluster slot top-id executors-set))
-            (.blacklistHost cluster host))
-          )))
-    
-    (let [failed-iso-topologies (->> topology-worker-specs
-                                  (mapcat (fn [[top-id worker-specs]]
-                                    (if-not (empty? worker-specs) [top-id])
-                                    )))]
-      (if (empty? failed-iso-topologies)
-        ;; run default scheduler on non-isolated topologies
-        (-<> topology-worker-specs
-             allocated-topologies
-             (leftover-topologies topologies <>)
-             (DefaultScheduler/defaultSchedule <> cluster))
-        (do
-          (log-warn "Unable to isolate topologies " (pr-str failed-iso-topologies) ". No machine had enough worker slots to run the remaining workers for these topologies. Clearing all other resources and will wait for enough resources for isolated topologies before allocating any other resources.")
-          ;; clear workers off all hosts that are not blacklisted
-          (doseq [[host slots] (host->used-slots cluster)]
-            (if-not (.isBlacklistedHost cluster host)
-              (.freeSlots cluster slots)
-              )))
-        ))
-    (.setBlacklistedHosts cluster orig-blacklist)
-    ))

http://git-wip-us.apache.org/repos/asf/storm/blob/fca8ebb4/storm-core/src/jvm/org/apache/storm/scheduler/EvenScheduler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/EvenScheduler.java b/storm-core/src/jvm/org/apache/storm/scheduler/EvenScheduler.java
index dec0a7b..a3822a6 100644
--- a/storm-core/src/jvm/org/apache/storm/scheduler/EvenScheduler.java
+++ b/storm-core/src/jvm/org/apache/storm/scheduler/EvenScheduler.java
@@ -105,12 +105,15 @@ public class EvenScheduler implements IScheduler {
         int totalSlotsToUse = Math.min(topology.getNumWorkers(), availableSlots.size() + aliveAssigned.size());
 
         List<WorkerSlot> sortedList = sortSlots(availableSlots);
-        if (sortedList == null || sortedList.size() < (totalSlotsToUse - aliveAssigned.size())) {
-            LOG.error("Available slots are not enough for topology: {}", topology.getName());
+        if (sortedList == null) {
+            LOG.error("No available slots for topology: {}", topology.getName());
             return new HashMap<ExecutorDetails, WorkerSlot>();
         }
 
-        List<WorkerSlot> reassignSlots = sortedList.subList(0, totalSlotsToUse - aliveAssigned.size());
+        //allow requesting slots number bigger than available slots
+        int toIndex = (totalSlotsToUse - aliveAssigned.size()) > sortedList.size() ? sortedList.size() : (totalSlotsToUse - aliveAssigned.size());
+        List<WorkerSlot> reassignSlots = sortedList.subList(0, toIndex);
+
         Set<ExecutorDetails> aliveExecutors = new HashSet<ExecutorDetails>();
         for (List<ExecutorDetails> list : aliveAssigned.values()) {
             aliveExecutors.addAll(list);

http://git-wip-us.apache.org/repos/asf/storm/blob/fca8ebb4/storm-core/src/jvm/org/apache/storm/scheduler/IsolationScheduler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/scheduler/IsolationScheduler.java b/storm-core/src/jvm/org/apache/storm/scheduler/IsolationScheduler.java
new file mode 100644
index 0000000..665e297
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/scheduler/IsolationScheduler.java
@@ -0,0 +1,425 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.scheduler;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+import org.apache.commons.lang.Validate;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.storm.Config;
+import org.apache.storm.scheduler.Cluster;
+import org.apache.storm.scheduler.ExecutorDetails;
+import org.apache.storm.scheduler.IScheduler;
+import org.apache.storm.scheduler.SchedulerAssignment;
+import org.apache.storm.scheduler.Topologies;
+import org.apache.storm.scheduler.TopologyDetails;
+import org.apache.storm.scheduler.WorkerSlot;
+
+// for each isolated topology:
+//   compute even distribution of executors -> workers on the number of workers specified for the topology
+//   compute distribution of workers to machines
+// determine host -> list of [slot, topology id, executors]
+// iterate through hosts and: a machine is good if:
+//   1. only running workers from one isolated topology
+//   2. all workers running on it match one of the distributions of executors for that topology
+//   3. matches one of the # of workers
+// blacklist the good hosts and remove those workers from the list of need to be assigned workers
+// otherwise unassign all other workers for isolated topologies if assigned
+public class IsolationScheduler implements IScheduler {
+    private final static Logger LOG = LoggerFactory.getLogger(IsolationScheduler.class);
+
+    private Map<String, Number> isoMachines;
+
+    @Override
+    public void prepare(Map conf) {
+        this.isoMachines = (Map<String, Number>) conf.get(Config.ISOLATION_SCHEDULER_MACHINES);
+        Validate.notEmpty(isoMachines);
+    }
+
+    // get host -> all assignable worker slots for non-blacklisted machines (assigned or not assigned)
+    // will then have a list of machines that need to be assigned (machine -> [topology, list of list of executors])
+    // match each spec to a machine (who has the right number of workers), free everything else on that machine and assign those slots (do one topology at a time)
+    // blacklist all machines who had production slots defined
+    // log isolated topologies who weren't able to get enough slots / machines
+    // run default scheduler on isolated topologies that didn't have enough slots + non-isolated topologies on remaining machines
+    // set blacklist to what it was initially
+    @Override
+    public void schedule(Topologies topologies, Cluster cluster) {
+        Set<String> origBlacklist = cluster.getBlacklistedHosts();
+        List<TopologyDetails> isoTopologies = isolatedTopologies(topologies.getTopologies());
+        Set<String> isoIds = extractTopologyIds(isoTopologies);
+        Map<String, Set<Set<ExecutorDetails>>> topologyWorkerSpecs = topologyWorkerSpecs(isoTopologies);
+        Map<String, Map<Integer, Integer>> topologyMachineDistributions = topologyMachineDistributions(isoTopologies);
+        Map<String, List<AssignmentInfo>> hostAssignments = hostAssignments(cluster);
+
+        for (Map.Entry<String, List<AssignmentInfo>> entry : hostAssignments.entrySet()) {
+            List<AssignmentInfo> assignments = entry.getValue();
+            String topologyId = assignments.get(0).getTopologyId();
+            Map<Integer, Integer> distribution = topologyMachineDistributions.get(topologyId);
+            Set<Set<ExecutorDetails>> workerSpecs = topologyWorkerSpecs.get(topologyId);
+            int numWorkers = assignments.size();
+
+            if (isoIds.contains(topologyId)
+                    && checkAssignmentTopology(assignments, topologyId)
+                    && distribution.containsKey(numWorkers)
+                    && checkAssignmentWorkerSpecs(assignments, workerSpecs)) {
+                decrementDistribution(distribution, numWorkers);
+                for (AssignmentInfo ass : assignments) {
+                    workerSpecs.remove(ass.getExecutors());
+                }
+                cluster.blacklistHost(entry.getKey());
+            } else {
+                for (AssignmentInfo ass : assignments) {
+                    if (isoIds.contains(ass.getTopologyId())) {
+                        cluster.freeSlot(ass.getWorkerSlot());
+                    }
+                }
+            }
+        }
+
+        Map<String, Set<WorkerSlot>> hostUsedSlots = hostToUsedSlots(cluster);
+        LinkedList<HostAssignableSlots> hss = hostAssignableSlots(cluster);
+        for (Map.Entry<String, Set<Set<ExecutorDetails>>> entry : topologyWorkerSpecs.entrySet()) {
+            String topologyId = entry.getKey();
+            Set<Set<ExecutorDetails>> executorSet = entry.getValue();
+            List<Integer> workerNum = distributionToSortedAmounts(topologyMachineDistributions.get(topologyId));
+            for (Integer num : workerNum) {
+                HostAssignableSlots hostSlots = hss.peek();
+                List<WorkerSlot> slot = hostSlots != null ? hostSlots.getWorkerSlots() : null;
+
+                if (slot != null && slot.size() >= num) {
+                    hss.poll();
+                    cluster.freeSlots(hostUsedSlots.get(hostSlots.getHostName()));
+                    for (WorkerSlot tmpSlot : slot.subList(0, num)) {
+                        Set<ExecutorDetails> executor = removeElemFromExecutorsSet(executorSet);
+                        cluster.assign(tmpSlot, topologyId, executor);
+                    }
+                    cluster.blacklistHost(hostSlots.getHostName());
+                }
+            }
+        }
+
+        List<String> failedTopologyIds = extractFailedTopologyIds(topologyWorkerSpecs);
+        if (failedTopologyIds.size() > 0) {
+            LOG.warn("Unable to isolate topologies " + failedTopologyIds
+                    + ". No machine had enough worker slots to run the remaining workers for these topologies. "
+                    + "Clearing all other resources and will wait for enough resources for "
+                    + "isolated topologies before allocating any other resources.");
+            // clear workers off all hosts that are not blacklisted
+            Map<String, Set<WorkerSlot>> usedSlots = hostToUsedSlots(cluster);
+            Set<Map.Entry<String, Set<WorkerSlot>>> entries = usedSlots.entrySet();
+            for (Map.Entry<String, Set<WorkerSlot>> entry : entries) {
+                if (!cluster.isBlacklistedHost(entry.getKey())) {
+                    cluster.freeSlots(entry.getValue());
+                }
+            }
+        } else {
+            // run default scheduler on non-isolated topologies
+            Set<String> allocatedTopologies = allocatedTopologies(topologyWorkerSpecs);
+            Topologies leftOverTopologies = leftoverTopologies(topologies, allocatedTopologies);
+            DefaultScheduler.defaultSchedule(leftOverTopologies, cluster);
+        }
+        cluster.setBlacklistedHosts(origBlacklist);
+    }
+
+    private Set<ExecutorDetails> removeElemFromExecutorsSet(Set<Set<ExecutorDetails>> executorsSets) {
+        Set<ExecutorDetails> elem = executorsSets.iterator().next();
+        executorsSets.remove(elem);
+        return elem;
+    }
+
+    private List<TopologyDetails> isolatedTopologies(Collection<TopologyDetails> topologies) {
+        Set<String> topologyNames = isoMachines.keySet();
+        List<TopologyDetails> isoTopologies = new ArrayList<TopologyDetails>();
+        for (TopologyDetails topo : topologies) {
+            if (topologyNames.contains(topo.getName())) {
+                isoTopologies.add(topo);
+            }
+        }
+        return isoTopologies;
+    }
+
+    private Set<String> extractTopologyIds(List<TopologyDetails> topologies) {
+        Set<String> ids = new HashSet<String>();
+        if (topologies != null && topologies.size() > 0) {
+            for (TopologyDetails topology : topologies) {
+                ids.add(topology.getId());
+            }
+        }
+        return ids;
+    }
+
+    private List<String> extractFailedTopologyIds(Map<String, Set<Set<ExecutorDetails>>> isoTopologyWorkerSpecs) {
+        List<String> failedTopologyIds = new ArrayList<String>();
+        for (Map.Entry<String, Set<Set<ExecutorDetails>>> topoWorkerSpecsEntry : isoTopologyWorkerSpecs.entrySet()){
+            Set<Set<ExecutorDetails>> workerSpecs = topoWorkerSpecsEntry.getValue();
+            if (workerSpecs != null && !workerSpecs.isEmpty()) {
+                failedTopologyIds.add(topoWorkerSpecsEntry.getKey());
+            }
+        }
+        return failedTopologyIds;
+    }
+
+    // map from topology id -> set of sets of executors
+    private Map<String, Set<Set<ExecutorDetails>>> topologyWorkerSpecs(List<TopologyDetails> topologies) {
+        Map<String, Set<Set<ExecutorDetails>>> workerSpecs = new HashMap<String, Set<Set<ExecutorDetails>>>();
+        for (TopologyDetails topology : topologies) {
+            workerSpecs.put(topology.getId(), computeWorkerSpecs(topology));
+        }
+        return workerSpecs;
+    }
+
+    private Map<String, List<AssignmentInfo>> hostAssignments(Cluster cluster) {
+        Collection<SchedulerAssignment> assignmentValues =  cluster.getAssignments().values();
+        Map<String, List<AssignmentInfo>> hostAssignments = new HashMap<String, List<AssignmentInfo>>();
+
+        for (SchedulerAssignment sa : assignmentValues) {
+            Map<WorkerSlot, List<ExecutorDetails>> slotExecutors = Utils.reverseMap(sa.getExecutorToSlot());
+            Set<Map.Entry<WorkerSlot, List<ExecutorDetails>>> entries = slotExecutors.entrySet();
+            for (Map.Entry<WorkerSlot, List<ExecutorDetails>> entry : entries) {
+                WorkerSlot slot = entry.getKey();
+                List<ExecutorDetails> executors = entry.getValue();
+
+                String host = cluster.getHost(slot.getNodeId());
+                AssignmentInfo ass = new AssignmentInfo(slot, sa.getTopologyId(), new HashSet<ExecutorDetails>(executors));
+                List<AssignmentInfo> executorList = hostAssignments.get(host);
+                if (executorList == null) {
+                    executorList = new ArrayList<AssignmentInfo>();
+                    hostAssignments.put(host, executorList);
+                }
+                executorList.add(ass);
+            }
+        }
+        return hostAssignments;
+    }
+
+    private Set<Set<ExecutorDetails>> computeWorkerSpecs(TopologyDetails topology) {
+        Map<String, List<ExecutorDetails>> compExecutors = Utils.reverseMap(topology.getExecutorToComponent());
+
+        List<ExecutorDetails> allExecutors = new ArrayList<ExecutorDetails>();
+        Collection<List<ExecutorDetails>> values = compExecutors.values();
+        for (List<ExecutorDetails> eList : values) {
+            allExecutors.addAll(eList);
+        }
+
+        int numWorkers = topology.getNumWorkers();
+        int bucketIndex = 0;
+        Map<Integer, Set<ExecutorDetails>> bucketExecutors = new HashMap<Integer, Set<ExecutorDetails>>(numWorkers);
+        for (ExecutorDetails executor : allExecutors) {
+            Set<ExecutorDetails> executors = bucketExecutors.get(bucketIndex);
+            if (executors == null) {
+                executors = new HashSet<ExecutorDetails>();
+                bucketExecutors.put(bucketIndex, executors);
+            }
+            executors.add(executor);
+            bucketIndex = (bucketIndex+1) % numWorkers;
+        }
+
+        return new HashSet<Set<ExecutorDetails>>(bucketExecutors.values());
+    }
+
+    private Map<String, Map<Integer, Integer>> topologyMachineDistributions(List<TopologyDetails> isoTopologies) {
+        Map<String, Map<Integer, Integer>> machineDistributions = new HashMap<String, Map<Integer, Integer>>();
+        for (TopologyDetails topology : isoTopologies) {
+            machineDistributions.put(topology.getId(), machineDistribution(topology));
+        }
+        return machineDistributions;
+    }
+
+    private Map<Integer, Integer> machineDistribution(TopologyDetails topology) {
+        int machineNum = isoMachines.get(topology.getName()).intValue();
+        int workerNum = topology.getNumWorkers();
+        TreeMap<Integer, Integer> distribution = Utils.integerDivided(workerNum, machineNum);
+
+        if (distribution.containsKey(0)) {
+            distribution.remove(0);
+        }
+        return distribution;
+    }
+
+    private boolean checkAssignmentTopology(List<AssignmentInfo> assignments, String topologyId) {
+        for (AssignmentInfo ass : assignments) {
+            if (!topologyId.equals(ass.getTopologyId())) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private boolean checkAssignmentWorkerSpecs(List<AssignmentInfo> assigments, Set<Set<ExecutorDetails>> workerSpecs) {
+        for (AssignmentInfo ass : assigments) {
+            if (!workerSpecs.contains(ass.getExecutors())) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private void decrementDistribution(Map<Integer, Integer> distribution, int value) {
+        Integer distValue = distribution.get(value);
+        if (distValue != null) {
+            int newValue = distValue - 1;
+            if (newValue == 0) {
+                distribution.remove(value);
+            } else {
+                distribution.put(value, newValue);
+            }
+        }
+    }
+
+    private Map<String, Set<WorkerSlot>> hostToUsedSlots(Cluster cluster) {
+        Collection<WorkerSlot> usedSlots = cluster.getUsedSlots();
+        Map<String, Set<WorkerSlot>> hostUsedSlots = new HashMap<String, Set<WorkerSlot>>();
+        for (WorkerSlot slot : usedSlots) {
+            String host = cluster.getHost(slot.getNodeId());
+            Set<WorkerSlot> slots = hostUsedSlots.get(host);
+            if (slots == null) {
+                slots = new HashSet<WorkerSlot>();
+                hostUsedSlots.put(host, slots);
+            }
+            slots.add(slot);
+        }
+        return hostUsedSlots;
+    }
+
+    // returns list of list of slots, reverse sorted by number of slots
+    private LinkedList<HostAssignableSlots> hostAssignableSlots(Cluster cluster) {
+        List<WorkerSlot> assignableSlots = cluster.getAssignableSlots();
+        Map<String, List<WorkerSlot>> hostAssignableSlots = new HashMap<String, List<WorkerSlot>>();
+        for (WorkerSlot slot : assignableSlots) {
+            String host = cluster.getHost(slot.getNodeId());
+            List<WorkerSlot> slots = hostAssignableSlots.get(host);
+            if (slots == null) {
+                slots = new ArrayList<WorkerSlot>();
+                hostAssignableSlots.put(host, slots);
+            }
+            slots.add(slot);
+        }
+        List<HostAssignableSlots> sortHostAssignSlots = new ArrayList<HostAssignableSlots>();
+        for (Map.Entry<String, List<WorkerSlot>> entry : hostAssignableSlots.entrySet()) {
+            sortHostAssignSlots.add(new HostAssignableSlots(entry.getKey(), entry.getValue()));
+        }
+        Collections.sort(sortHostAssignSlots, new Comparator<HostAssignableSlots>() {
+                    @Override
+                    public int compare(HostAssignableSlots o1, HostAssignableSlots o2) {
+                        return o2.getWorkerSlots().size() - o1.getWorkerSlots().size();
+                    }
+                });
+        Collections.shuffle(sortHostAssignSlots);
+
+        return new LinkedList<HostAssignableSlots>(sortHostAssignSlots);
+    }
+
+    private List<Integer> distributionToSortedAmounts(Map<Integer, Integer> distributions) {
+        List<Integer> sorts = new ArrayList<Integer>();
+        for (Map.Entry<Integer, Integer> entry : distributions.entrySet()) {
+            int workers = entry.getKey();
+            int machines = entry.getValue();
+            for (int i = 0; i < machines; i++) {
+                sorts.add(workers);
+            }
+        }
+        Collections.sort(sorts, new Comparator<Integer>() {
+            @Override
+            public int compare(Integer o1, Integer o2) {
+                return o2.intValue() - o1.intValue();
+            }
+        });
+
+        return sorts;
+    }
+
+    private Set<String> allocatedTopologies(Map<String, Set<Set<ExecutorDetails>>> topologyToWorkerSpecs) {
+        Set<String> allocatedTopologies = new HashSet<String>();
+        Set<Map.Entry<String, Set<Set<ExecutorDetails>>>> entries = topologyToWorkerSpecs.entrySet();
+        for (Map.Entry<String, Set<Set<ExecutorDetails>>> entry : entries) {
+            if (entry.getValue().isEmpty()) {
+                allocatedTopologies.add(entry.getKey());
+            }
+        }
+        return allocatedTopologies;
+    }
+
+    private Topologies leftoverTopologies(Topologies topologies, Set<String> filterIds) {
+        Collection<TopologyDetails> topos = topologies.getTopologies();
+        Map<String, TopologyDetails> leftoverTopologies = new HashMap<String, TopologyDetails>();
+        for (TopologyDetails topo : topos) {
+            String id = topo.getId();
+            if (!filterIds.contains(id)) {
+                leftoverTopologies.put(id, topo);
+            }
+        }
+        return new Topologies(leftoverTopologies);
+    }
+
+    class AssignmentInfo {
+        private WorkerSlot workerSlot;
+        private String topologyId;
+        private Set<ExecutorDetails> executors;
+
+        public AssignmentInfo(WorkerSlot workerSlot, String topologyId, Set<ExecutorDetails> executors) {
+            this.workerSlot = workerSlot;
+            this.topologyId = topologyId;
+            this.executors = executors;
+        }
+
+        public WorkerSlot getWorkerSlot() {
+            return workerSlot;
+        }
+
+        public String getTopologyId() {
+            return topologyId;
+        }
+
+        public Set<ExecutorDetails> getExecutors() {
+            return executors;
+        }
+
+    }
+
+    class HostAssignableSlots {
+        private String hostName;
+        private List<WorkerSlot> workerSlots;
+
+        public HostAssignableSlots(String hostName, List<WorkerSlot> workerSlots) {
+            this.hostName = hostName;
+            this.workerSlots = workerSlots;
+        }
+
+        public String getHostName() {
+            return hostName;
+        }
+
+        public List<WorkerSlot> getWorkerSlots() {
+            return workerSlots;
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/fca8ebb4/storm-core/src/jvm/org/apache/storm/utils/Container.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/Container.java b/storm-core/src/jvm/org/apache/storm/utils/Container.java
deleted file mode 100644
index d875731..0000000
--- a/storm-core/src/jvm/org/apache/storm/utils/Container.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.utils;
-
-import java.io.Serializable;
-
-public class Container implements Serializable {
-    private Object object;
-
-    public Object get () {
-        return object;
-    }
-
-    public Container set (Object obj) {
-        object = obj;
-        return this;
-    }
-}


[2/3] storm git commit: Merge branch 'STORM-1239' of https://github.com/vesense/storm into STORM-1239-merge

Posted by ka...@apache.org.
Merge branch 'STORM-1239' of https://github.com/vesense/storm into STORM-1239-merge


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/33e7089d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/33e7089d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/33e7089d

Branch: refs/heads/master
Commit: 33e7089d75be896f4bb4ee11f6aa2e8be00d7da6
Parents: d5acec9 fca8ebb
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Sun Dec 25 19:25:58 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sun Dec 25 19:25:58 2016 +0900

----------------------------------------------------------------------
 .../storm/scheduler/IsolationScheduler.clj      | 232 ----------
 .../apache/storm/scheduler/EvenScheduler.java   |   9 +-
 .../storm/scheduler/IsolationScheduler.java     | 425 +++++++++++++++++++
 .../jvm/org/apache/storm/utils/Container.java   |  33 --
 4 files changed, 431 insertions(+), 268 deletions(-)
----------------------------------------------------------------------



[3/3] storm git commit: STORM-1239: CHANGELOG

Posted by ka...@apache.org.
STORM-1239: CHANGELOG


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/273fc4b7
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/273fc4b7
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/273fc4b7

Branch: refs/heads/master
Commit: 273fc4b747bb6ab9a10a147ecccb8b3da9fa0b2e
Parents: 33e7089
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Sun Dec 25 19:32:36 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sun Dec 25 19:32:36 2016 +0900

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/273fc4b7/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index aeb4ad8..722f537 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 2.0.0
+ * STORM-1239: port backtype.storm.scheduler.IsolationScheduler to java
  * STORM-2217: Finish porting drpc to java
  * STORM-1308: port backtype.storm.tick-tuple-test to java
  * STORM-2245: integration-test constant compilation failure