You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by HeartSaVioR <gi...@git.apache.org> on 2017/09/25 13:23:21 UTC

[GitHub] storm pull request #2343: STORM-2083 Blacklist scheduler

GitHub user HeartSaVioR opened a pull request:

    https://github.com/apache/storm/pull/2343

    STORM-2083 Blacklist scheduler

    This is rebased version of #1674 with addressing latest review comments from me.
    
    Sure the credit for providing great stuff should go to @nilday. :)
    
    Since now I can't give +1 to my patch, it would be really appreciated if someone could review and do some manual tests.
    
    Thanks in advance.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/HeartSaVioR/storm STORM-2083

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/2343.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2343
    
----
commit 9823af7b7ffa9d6552bbfc3a505a5f5245a74a30
Author: howard.li <ho...@vipshop.com>
Date:   2016-07-07T10:32:30Z

    STORM-2083: Blacklist scheduler
    
    Resolved conflict via Jungtaek Lim <ka...@gmail.com>
    
    This commit squashes the commits into one and commit messages were below:
    
    1. add apache header
    2. rename method
    3. move config define to Config
    4. code style fix
    5. change debug log level to debug
    
    remove all blacklist enable config
    
    remove unused default value
    
    1.storm blacklist code style, header and other bugs
    2.wrap blacklist scheduler in nimbus and rebase to master
    
    change blacklist-scheduler schedule method log level.
    
    rename some variables and refactor badSlots args
    
    add blacklist.scheduler to default.yaml
    
    1. removeLongTimeDisappearFromCache bug fix
    2. add unit test for removeLongTimeDisappearFromCache
    3. change blacklistScheduler fields to protected so it can be visited from sub-class and unit tests
    
    1. remove CircularBuffer and replace it with guava EvictingQueue.
    2. modify nimbus_test.clj to adapt blacklistScheduler
    3. comments, Utils.getInt, DefaultBlacklistStrategy.prepare with conf

commit 71fdbbc29670534dbb4e9d6ecfcecb8c2fd8e69d
Author: Jungtaek Lim <ka...@gmail.com>
Date:   2017-09-25T13:16:48Z

    STORM-2083 Blacklist scheduler
    
    * address review comments from @HeartSaVioR and @revans2

----


---

[GitHub] storm pull request #2343: STORM-2083 Blacklist scheduler

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2343#discussion_r141189175
  
    --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/blacklist/strategies/DefaultBlacklistStrategy.java ---
    @@ -0,0 +1,169 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.scheduler.blacklist.strategies;
    +
    +import org.apache.storm.DaemonConfig;
    +import org.apache.storm.scheduler.Cluster;
    +import org.apache.storm.scheduler.SupervisorDetails;
    +import org.apache.storm.scheduler.Topologies;
    +import org.apache.storm.scheduler.TopologyDetails;
    +import org.apache.storm.scheduler.WorkerSlot;
    +import org.apache.storm.scheduler.blacklist.reporters.IReporter;
    +import org.apache.storm.scheduler.blacklist.reporters.LogReporter;
    +import org.apache.storm.utils.ObjectReader;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.TreeMap;
    +
    +public class DefaultBlacklistStrategy implements IBlacklistStrategy {
    +
    +    private static Logger LOG = LoggerFactory.getLogger(DefaultBlacklistStrategy.class);
    +
    +    public static final int DEFAULT_BLACKLIST_SCHEDULER_RESUME_TIME = 1800;
    +    public static final int DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_COUNT = 3;
    +
    +    private IReporter _reporter;
    --- End diff --
    
    Again if we remove the '_' I think that would fix a lot of the checkstyle violations.


---

[GitHub] storm pull request #2343: STORM-2083 Blacklist scheduler

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2343#discussion_r141209476
  
    --- Diff: storm-server/src/main/java/org/apache/storm/DaemonConfig.java ---
    @@ -103,6 +110,38 @@
         public static final String STORM_SCHEDULER = "storm.scheduler";
     
         /**
    +     * The number of seconds that the blacklist scheduler will concern of bad slots or supervisors
    +     */
    +    @isInteger
    +    public static final String BLACKLIST_SCHEDULER_TOLERANCE_TIME = "blacklist.scheduler.tolerance.time.secs";
    +
    +    /**
    +     * The number of hit count that will trigger blacklist in tolerance time
    +     */
    +    @isInteger
    --- End diff --
    
    Negative count doesn't make sense. I'll change it to `@isPositiveNumber`


---

[GitHub] storm pull request #2343: STORM-2083 Blacklist scheduler

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2343#discussion_r141209321
  
    --- Diff: storm-server/pom.xml ---
    @@ -136,7 +136,7 @@
                     <artifactId>maven-checkstyle-plugin</artifactId>
                     <!--Note - the version would be inherited-->
                     <configuration>
    -                    <maxAllowedViolations>3626</maxAllowedViolations>
    +                    <maxAllowedViolations>4000</maxAllowedViolations>
    --- End diff --
    
    Ah yes I increased the number while rebasing and planned to fix it afterwards, but forgot to fix. I'll address. Thanks for the notice.


---

[GitHub] storm pull request #2343: STORM-2083 Blacklist scheduler

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2343#discussion_r141187945
  
    --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java ---
    @@ -0,0 +1,248 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.scheduler.blacklist;
    +
    +import com.google.common.collect.EvictingQueue;
    +import org.apache.storm.DaemonConfig;
    +import org.apache.storm.metric.StormMetricsRegistry;
    +import org.apache.storm.scheduler.Cluster;
    +import org.apache.storm.scheduler.IScheduler;
    +import org.apache.storm.scheduler.SupervisorDetails;
    +import org.apache.storm.scheduler.Topologies;
    +import org.apache.storm.scheduler.WorkerSlot;
    +import org.apache.storm.scheduler.blacklist.reporters.IReporter;
    +import org.apache.storm.scheduler.blacklist.reporters.LogReporter;
    +import org.apache.storm.scheduler.blacklist.strategies.DefaultBlacklistStrategy;
    +import org.apache.storm.scheduler.blacklist.strategies.IBlacklistStrategy;
    +import org.apache.storm.utils.ObjectReader;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.Callable;
    +
    +public class BlacklistScheduler implements IScheduler {
    +    private static final Logger LOG = LoggerFactory.getLogger(BlacklistScheduler.class);
    +
    +    public static final int DEFAULT_BLACKLIST_SCHEDULER_RESUME_TIME = 1800;
    +    public static final int DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_COUNT = 3;
    +    public static final int DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_TIME = 300;
    +
    +    private final IScheduler underlyingScheduler;
    +    @SuppressWarnings("rawtypes")
    +    private Map _conf;
    --- End diff --
    
    Can we change this to `conf` which will fix one of the violations.


---

[GitHub] storm issue #2343: STORM-2083 Blacklist scheduler

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/2343
  
    @revans2 Thanks for reviewing. I've addressed your comments, and rebased with latest master.


---

[GitHub] storm pull request #2343: STORM-2083 Blacklist scheduler

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2343#discussion_r141220584
  
    --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/blacklist/Sets.java ---
    @@ -0,0 +1,58 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.scheduler.blacklist;
    +
    +import java.util.HashSet;
    +import java.util.Set;
    +
    +public class Sets {
    --- End diff --
    
    Filed STORM-2762: https://issues.apache.org/jira/browse/STORM-2762


---

[GitHub] storm pull request #2343: STORM-2083 Blacklist scheduler

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2343#discussion_r141187500
  
    --- Diff: storm-server/src/main/java/org/apache/storm/DaemonConfig.java ---
    @@ -103,6 +110,38 @@
         public static final String STORM_SCHEDULER = "storm.scheduler";
     
         /**
    +     * The number of seconds that the blacklist scheduler will concern of bad slots or supervisors
    +     */
    +    @isInteger
    +    public static final String BLACKLIST_SCHEDULER_TOLERANCE_TIME = "blacklist.scheduler.tolerance.time.secs";
    +
    +    /**
    +     * The number of hit count that will trigger blacklist in tolerance time
    +     */
    +    @isInteger
    --- End diff --
    
    Is a negative count allowed?  If not we should also mark it as `@isPositiveNumber`
    
    Same for tolerance time and resume time.


---

[GitHub] storm pull request #2343: STORM-2083 Blacklist scheduler

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2343#discussion_r141219408
  
    --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/blacklist/Sets.java ---
    @@ -0,0 +1,58 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.scheduler.blacklist;
    +
    +import java.util.HashSet;
    +import java.util.Set;
    +
    +public class Sets {
    --- End diff --
    
    OK I'll file an issue. Thanks for pointing out.


---

[GitHub] storm pull request #2343: STORM-2083 Blacklist scheduler

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2343#discussion_r141188960
  
    --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/blacklist/Sets.java ---
    @@ -0,0 +1,58 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.scheduler.blacklist;
    +
    +import java.util.HashSet;
    +import java.util.Set;
    +
    +public class Sets {
    --- End diff --
    
    We do set logic in a number of places could you file a follow on JIRA for someone to go through and make it all common?


---

[GitHub] storm issue #2343: STORM-2083 Blacklist scheduler

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the issue:

    https://github.com/apache/storm/pull/2343
  
    Testing worked really well.  I am +1 once my nits are addressed.


---

[GitHub] storm pull request #2343: STORM-2083 Blacklist scheduler

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2343#discussion_r141209841
  
    --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java ---
    @@ -0,0 +1,248 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.scheduler.blacklist;
    +
    +import com.google.common.collect.EvictingQueue;
    +import org.apache.storm.DaemonConfig;
    +import org.apache.storm.metric.StormMetricsRegistry;
    +import org.apache.storm.scheduler.Cluster;
    +import org.apache.storm.scheduler.IScheduler;
    +import org.apache.storm.scheduler.SupervisorDetails;
    +import org.apache.storm.scheduler.Topologies;
    +import org.apache.storm.scheduler.WorkerSlot;
    +import org.apache.storm.scheduler.blacklist.reporters.IReporter;
    +import org.apache.storm.scheduler.blacklist.reporters.LogReporter;
    +import org.apache.storm.scheduler.blacklist.strategies.DefaultBlacklistStrategy;
    +import org.apache.storm.scheduler.blacklist.strategies.IBlacklistStrategy;
    +import org.apache.storm.utils.ObjectReader;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.Callable;
    +
    +public class BlacklistScheduler implements IScheduler {
    +    private static final Logger LOG = LoggerFactory.getLogger(BlacklistScheduler.class);
    +
    +    public static final int DEFAULT_BLACKLIST_SCHEDULER_RESUME_TIME = 1800;
    +    public static final int DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_COUNT = 3;
    +    public static final int DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_TIME = 300;
    +
    +    private final IScheduler underlyingScheduler;
    +    @SuppressWarnings("rawtypes")
    +    private Map _conf;
    +
    +    protected int toleranceTime;
    +    protected int toleranceCount;
    +    protected int resumeTime;
    +    protected IReporter reporter;
    +    protected IBlacklistStrategy blacklistStrategy;
    +
    +    protected int nimbusMonitorFreqSecs;
    +
    +    protected Map<String, Set<Integer>> cachedSupervisors;
    +
    +    //key is supervisor key ,value is supervisor ports
    +    protected EvictingQueue<HashMap<String, Set<Integer>>> badSupervisorsToleranceSlidingWindow;
    +    protected int windowSize;
    +    protected Set<String> blacklistHost;
    +
    +    public BlacklistScheduler(IScheduler underlyingScheduler) {
    +        this.underlyingScheduler = underlyingScheduler;
    +    }
    +
    +    @Override
    +    public void prepare(Map conf) {
    +        LOG.info("Preparing black list scheduler");
    +        underlyingScheduler.prepare(conf);
    +        _conf = conf;
    +
    +        toleranceTime = ObjectReader.getInt(_conf.get(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_TIME), DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_TIME);
    +        toleranceCount = ObjectReader.getInt( _conf.get(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_COUNT), DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_COUNT);
    +        resumeTime = ObjectReader.getInt( _conf.get(DaemonConfig.BLACKLIST_SCHEDULER_RESUME_TIME), DEFAULT_BLACKLIST_SCHEDULER_RESUME_TIME);
    +
    +        String reporterClassName = ObjectReader.getString(_conf.get(DaemonConfig.BLACKLIST_SCHEDULER_REPORTER),
    +                LogReporter.class.getName());
    +        reporter = (IReporter) initializeInstance(reporterClassName, "blacklist reporter");
    +
    +        String strategyClassName = ObjectReader.getString(_conf.get(DaemonConfig.BLACKLIST_SCHEDULER_STRATEGY),
    +                DefaultBlacklistStrategy.class.getName());
    +        blacklistStrategy = (IBlacklistStrategy) initializeInstance(strategyClassName, "blacklist strategy");
    +
    +        nimbusMonitorFreqSecs = ObjectReader.getInt( _conf.get(DaemonConfig.NIMBUS_MONITOR_FREQ_SECS));
    +        blacklistStrategy.prepare(_conf);
    +
    +        windowSize = toleranceTime / nimbusMonitorFreqSecs;
    +        badSupervisorsToleranceSlidingWindow = EvictingQueue.create(windowSize);
    +        cachedSupervisors = new HashMap<>();
    +        blacklistHost = new HashSet<>();
    +
    +        StormMetricsRegistry.registerGauge("nimbus:num-blacklisted-supervisor", new Callable<Integer>() {
    +            @Override
    +            public Integer call() throws Exception {
    +                //nimbus:num-blacklisted-supervisor + none blacklisted supervisor = nimbus:num-supervisors
    +                return blacklistHost.size();
    +            }
    +        });
    +    }
    +
    +    @Override
    +    public void schedule(Topologies topologies, Cluster cluster) {
    +        LOG.debug("running Black List scheduler");
    +        Map<String, SupervisorDetails> supervisors = cluster.getSupervisors();
    +        LOG.debug("AssignableSlots: {}", cluster.getAssignableSlots());
    +        LOG.debug("AvailableSlots: {}", cluster.getAvailableSlots());
    +        LOG.debug("UsedSlots: {}", cluster.getUsedSlots());
    +
    +        blacklistStrategy.resumeFromBlacklist();
    +        badSupervisors(supervisors);
    +        Set<String> blacklistHosts = getBlacklistHosts(cluster, topologies);
    +        this.blacklistHost = blacklistHosts;
    +        cluster.setBlacklistedHosts(blacklistHosts);
    +        removeLongTimeDisappearFromCache();
    +
    +        underlyingScheduler.schedule(topologies, cluster);
    +    }
    +
    +    @Override
    +    public Map<String, Object> config() {
    +        return underlyingScheduler.config();
    +    }
    +
    +    private void badSupervisors(Map<String, SupervisorDetails> supervisors) {
    +        Set<String> cachedSupervisorsKeySet = cachedSupervisors.keySet();
    +        Set<String> supervisorsKeySet = supervisors.keySet();
    +
    +        Set<String> badSupervisorKeys = Sets.difference(cachedSupervisorsKeySet, supervisorsKeySet); //cached supervisor doesn't show up
    +        HashMap<String, Set<Integer>> badSupervisors = new HashMap<>();
    +        for (String key : badSupervisorKeys) {
    +            badSupervisors.put(key, cachedSupervisors.get(key));
    +        }
    +
    +        for (Map.Entry<String, SupervisorDetails> entry : supervisors.entrySet()) {
    +            String key = entry.getKey();
    +            SupervisorDetails supervisorDetails = entry.getValue();
    +            if (cachedSupervisors.containsKey(key)) {
    +                Set<Integer> badSlots = badSlots(supervisorDetails, key);
    +                if (badSlots.size() > 0) { //supervisor contains bad slots
    +                    badSupervisors.put(key, badSlots);
    +                }
    +            } else {
    +                cachedSupervisors.put(key, supervisorDetails.getAllPorts()); //new supervisor to cache
    +            }
    +        }
    +
    +        badSupervisorsToleranceSlidingWindow.add(badSupervisors);
    +    }
    +
    +    private Set<Integer> badSlots(SupervisorDetails supervisor, String supervisorKey) {
    +        Set<Integer> cachedSupervisorPorts = cachedSupervisors.get(supervisorKey);
    +        Set<Integer> supervisorPorts = supervisor.getAllPorts();
    +
    +        Set<Integer> newPorts = Sets.difference(supervisorPorts, cachedSupervisorPorts);
    +        if (newPorts.size() > 0) {
    +            //add new ports to cached supervisor
    +            cachedSupervisors.put(supervisorKey, Sets.union(newPorts, cachedSupervisorPorts));
    +        }
    +
    +        Set<Integer> badSlots = Sets.difference(cachedSupervisorPorts, supervisorPorts);
    +        return badSlots;
    +    }
    +
    +    private Set<String> getBlacklistHosts(Cluster cluster, Topologies topologies) {
    +        Set<String> blacklistSet = blacklistStrategy.getBlacklist(new ArrayList<>(badSupervisorsToleranceSlidingWindow), cluster, topologies);
    +        Set<String> blacklistHostSet = new HashSet<>();
    +        for (String supervisor : blacklistSet) {
    +            String host = cluster.getHost(supervisor);
    +            if (host != null) {
    +                blacklistHostSet.add(host);
    +            } else {
    +                LOG.info("supervisor {} is not alive, do not need to add to blacklist.", supervisor);
    +            }
    +        }
    +        return blacklistHostSet;
    +    }
    +
    +    /**
    +     * supervisor or port never exits once in tolerance time will be removed from cache.
    +     */
    +    private void removeLongTimeDisappearFromCache() {
    +
    +        Map<String, Integer> supervisorCountMap = new HashMap<String, Integer>();
    +        Map<WorkerSlot, Integer> slotCountMap = new HashMap<WorkerSlot, Integer>();
    +
    +        for (Map<String, Set<Integer>> item : badSupervisorsToleranceSlidingWindow) {
    +            Set<String> supervisors = item.keySet();
    +            for (String supervisor : supervisors) {
    +                int supervisorCount = supervisorCountMap.getOrDefault(supervisor, 0);
    +                Set<Integer> slots = item.get(supervisor);
    +                if (slots.equals(cachedSupervisors.get(supervisor))) { // treat supervisor as bad only if all slots are bad
    +                    supervisorCountMap.put(supervisor, supervisorCount + 1);
    +                }
    +                for (Integer slot : slots) {
    +                    WorkerSlot workerSlot = new WorkerSlot(supervisor, slot);
    +                    int slotCount = slotCountMap.getOrDefault(workerSlot, 0);
    +                    slotCountMap.put(workerSlot, slotCount + 1);
    +                }
    +            }
    +        }
    +
    +        for (Map.Entry<String, Integer> entry : supervisorCountMap.entrySet()) {
    +            String key = entry.getKey();
    +            int value = entry.getValue();
    +            if (value == windowSize) { // supervisor which was never back to normal in tolerance period will be removed from cache
    +                cachedSupervisors.remove(key);
    +                LOG.info("Supervisor {} was never back to normal during tolerance period, probably dead. Will remove from cache.", key);
    +            }
    +        }
    +
    +        for (Map.Entry<WorkerSlot, Integer> entry : slotCountMap.entrySet()) {
    +            WorkerSlot workerSlot = entry.getKey();
    +            String supervisorKey = workerSlot.getNodeId();
    +            Integer slot = workerSlot.getPort();
    +            int value = entry.getValue();
    +            if (value == windowSize) { // worker slot which was never back to normal in tolerance period will be removed from cache
    +                Set<Integer> slots = cachedSupervisors.get(supervisorKey);
    +                if (slots != null) { // slots will be null while supervisor has been removed from cached supervisors
    +                    slots.remove(slot);
    +                    cachedSupervisors.put(supervisorKey, slots);
    +                }
    +                LOG.info("Worker slot {} was never back to normal during tolerance period, probably dead. Will be removed from cache.", workerSlot);
    +            }
    +        }
    +    }
    +
    +    private Object initializeInstance(String className, String representation) {
    --- End diff --
    
    The refactoring is mainly intended for removing duplicated long `catch` statements. Btw I'll change below line to use Utils.


---

[GitHub] storm pull request #2343: STORM-2083 Blacklist scheduler

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/storm/pull/2343


---

[GitHub] storm pull request #2343: STORM-2083 Blacklist scheduler

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2343#discussion_r141194357
  
    --- Diff: storm-server/pom.xml ---
    @@ -136,7 +136,7 @@
                     <artifactId>maven-checkstyle-plugin</artifactId>
                     <!--Note - the version would be inherited-->
                     <configuration>
    -                    <maxAllowedViolations>3626</maxAllowedViolations>
    +                    <maxAllowedViolations>4000</maxAllowedViolations>
    --- End diff --
    
    It is actually 3679 violations (not 4000). 


---

[GitHub] storm pull request #2343: STORM-2083 Blacklist scheduler

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2343#discussion_r141188696
  
    --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java ---
    @@ -0,0 +1,248 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.scheduler.blacklist;
    +
    +import com.google.common.collect.EvictingQueue;
    +import org.apache.storm.DaemonConfig;
    +import org.apache.storm.metric.StormMetricsRegistry;
    +import org.apache.storm.scheduler.Cluster;
    +import org.apache.storm.scheduler.IScheduler;
    +import org.apache.storm.scheduler.SupervisorDetails;
    +import org.apache.storm.scheduler.Topologies;
    +import org.apache.storm.scheduler.WorkerSlot;
    +import org.apache.storm.scheduler.blacklist.reporters.IReporter;
    +import org.apache.storm.scheduler.blacklist.reporters.LogReporter;
    +import org.apache.storm.scheduler.blacklist.strategies.DefaultBlacklistStrategy;
    +import org.apache.storm.scheduler.blacklist.strategies.IBlacklistStrategy;
    +import org.apache.storm.utils.ObjectReader;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.Callable;
    +
    +public class BlacklistScheduler implements IScheduler {
    +    private static final Logger LOG = LoggerFactory.getLogger(BlacklistScheduler.class);
    +
    +    public static final int DEFAULT_BLACKLIST_SCHEDULER_RESUME_TIME = 1800;
    +    public static final int DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_COUNT = 3;
    +    public static final int DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_TIME = 300;
    +
    +    private final IScheduler underlyingScheduler;
    +    @SuppressWarnings("rawtypes")
    +    private Map _conf;
    +
    +    protected int toleranceTime;
    +    protected int toleranceCount;
    +    protected int resumeTime;
    +    protected IReporter reporter;
    +    protected IBlacklistStrategy blacklistStrategy;
    +
    +    protected int nimbusMonitorFreqSecs;
    +
    +    protected Map<String, Set<Integer>> cachedSupervisors;
    +
    +    //key is supervisor key ,value is supervisor ports
    +    protected EvictingQueue<HashMap<String, Set<Integer>>> badSupervisorsToleranceSlidingWindow;
    +    protected int windowSize;
    +    protected Set<String> blacklistHost;
    +
    +    public BlacklistScheduler(IScheduler underlyingScheduler) {
    +        this.underlyingScheduler = underlyingScheduler;
    +    }
    +
    +    @Override
    +    public void prepare(Map conf) {
    +        LOG.info("Preparing black list scheduler");
    +        underlyingScheduler.prepare(conf);
    +        _conf = conf;
    +
    +        toleranceTime = ObjectReader.getInt(_conf.get(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_TIME), DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_TIME);
    +        toleranceCount = ObjectReader.getInt( _conf.get(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_COUNT), DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_COUNT);
    +        resumeTime = ObjectReader.getInt( _conf.get(DaemonConfig.BLACKLIST_SCHEDULER_RESUME_TIME), DEFAULT_BLACKLIST_SCHEDULER_RESUME_TIME);
    +
    +        String reporterClassName = ObjectReader.getString(_conf.get(DaemonConfig.BLACKLIST_SCHEDULER_REPORTER),
    +                LogReporter.class.getName());
    +        reporter = (IReporter) initializeInstance(reporterClassName, "blacklist reporter");
    +
    +        String strategyClassName = ObjectReader.getString(_conf.get(DaemonConfig.BLACKLIST_SCHEDULER_STRATEGY),
    +                DefaultBlacklistStrategy.class.getName());
    +        blacklistStrategy = (IBlacklistStrategy) initializeInstance(strategyClassName, "blacklist strategy");
    +
    +        nimbusMonitorFreqSecs = ObjectReader.getInt( _conf.get(DaemonConfig.NIMBUS_MONITOR_FREQ_SECS));
    +        blacklistStrategy.prepare(_conf);
    +
    +        windowSize = toleranceTime / nimbusMonitorFreqSecs;
    +        badSupervisorsToleranceSlidingWindow = EvictingQueue.create(windowSize);
    +        cachedSupervisors = new HashMap<>();
    +        blacklistHost = new HashSet<>();
    +
    +        StormMetricsRegistry.registerGauge("nimbus:num-blacklisted-supervisor", new Callable<Integer>() {
    +            @Override
    +            public Integer call() throws Exception {
    +                //nimbus:num-blacklisted-supervisor + none blacklisted supervisor = nimbus:num-supervisors
    +                return blacklistHost.size();
    +            }
    +        });
    +    }
    +
    +    @Override
    +    public void schedule(Topologies topologies, Cluster cluster) {
    +        LOG.debug("running Black List scheduler");
    +        Map<String, SupervisorDetails> supervisors = cluster.getSupervisors();
    +        LOG.debug("AssignableSlots: {}", cluster.getAssignableSlots());
    +        LOG.debug("AvailableSlots: {}", cluster.getAvailableSlots());
    +        LOG.debug("UsedSlots: {}", cluster.getUsedSlots());
    +
    +        blacklistStrategy.resumeFromBlacklist();
    +        badSupervisors(supervisors);
    +        Set<String> blacklistHosts = getBlacklistHosts(cluster, topologies);
    +        this.blacklistHost = blacklistHosts;
    +        cluster.setBlacklistedHosts(blacklistHosts);
    +        removeLongTimeDisappearFromCache();
    +
    +        underlyingScheduler.schedule(topologies, cluster);
    +    }
    +
    +    @Override
    +    public Map<String, Object> config() {
    +        return underlyingScheduler.config();
    +    }
    +
    +    private void badSupervisors(Map<String, SupervisorDetails> supervisors) {
    +        Set<String> cachedSupervisorsKeySet = cachedSupervisors.keySet();
    +        Set<String> supervisorsKeySet = supervisors.keySet();
    +
    +        Set<String> badSupervisorKeys = Sets.difference(cachedSupervisorsKeySet, supervisorsKeySet); //cached supervisor doesn't show up
    +        HashMap<String, Set<Integer>> badSupervisors = new HashMap<>();
    +        for (String key : badSupervisorKeys) {
    +            badSupervisors.put(key, cachedSupervisors.get(key));
    +        }
    +
    +        for (Map.Entry<String, SupervisorDetails> entry : supervisors.entrySet()) {
    +            String key = entry.getKey();
    +            SupervisorDetails supervisorDetails = entry.getValue();
    +            if (cachedSupervisors.containsKey(key)) {
    +                Set<Integer> badSlots = badSlots(supervisorDetails, key);
    +                if (badSlots.size() > 0) { //supervisor contains bad slots
    +                    badSupervisors.put(key, badSlots);
    +                }
    +            } else {
    +                cachedSupervisors.put(key, supervisorDetails.getAllPorts()); //new supervisor to cache
    +            }
    +        }
    +
    +        badSupervisorsToleranceSlidingWindow.add(badSupervisors);
    +    }
    +
    +    private Set<Integer> badSlots(SupervisorDetails supervisor, String supervisorKey) {
    +        Set<Integer> cachedSupervisorPorts = cachedSupervisors.get(supervisorKey);
    +        Set<Integer> supervisorPorts = supervisor.getAllPorts();
    +
    +        Set<Integer> newPorts = Sets.difference(supervisorPorts, cachedSupervisorPorts);
    +        if (newPorts.size() > 0) {
    +            //add new ports to cached supervisor
    +            cachedSupervisors.put(supervisorKey, Sets.union(newPorts, cachedSupervisorPorts));
    +        }
    +
    +        Set<Integer> badSlots = Sets.difference(cachedSupervisorPorts, supervisorPorts);
    +        return badSlots;
    +    }
    +
    +    private Set<String> getBlacklistHosts(Cluster cluster, Topologies topologies) {
    +        Set<String> blacklistSet = blacklistStrategy.getBlacklist(new ArrayList<>(badSupervisorsToleranceSlidingWindow), cluster, topologies);
    +        Set<String> blacklistHostSet = new HashSet<>();
    +        for (String supervisor : blacklistSet) {
    +            String host = cluster.getHost(supervisor);
    +            if (host != null) {
    +                blacklistHostSet.add(host);
    +            } else {
    +                LOG.info("supervisor {} is not alive, do not need to add to blacklist.", supervisor);
    +            }
    +        }
    +        return blacklistHostSet;
    +    }
    +
    +    /**
    +     * supervisor or port never exits once in tolerance time will be removed from cache.
    +     */
    +    private void removeLongTimeDisappearFromCache() {
    +
    +        Map<String, Integer> supervisorCountMap = new HashMap<String, Integer>();
    +        Map<WorkerSlot, Integer> slotCountMap = new HashMap<WorkerSlot, Integer>();
    +
    +        for (Map<String, Set<Integer>> item : badSupervisorsToleranceSlidingWindow) {
    +            Set<String> supervisors = item.keySet();
    +            for (String supervisor : supervisors) {
    +                int supervisorCount = supervisorCountMap.getOrDefault(supervisor, 0);
    +                Set<Integer> slots = item.get(supervisor);
    +                if (slots.equals(cachedSupervisors.get(supervisor))) { // treat supervisor as bad only if all slots are bad
    +                    supervisorCountMap.put(supervisor, supervisorCount + 1);
    +                }
    +                for (Integer slot : slots) {
    +                    WorkerSlot workerSlot = new WorkerSlot(supervisor, slot);
    +                    int slotCount = slotCountMap.getOrDefault(workerSlot, 0);
    +                    slotCountMap.put(workerSlot, slotCount + 1);
    +                }
    +            }
    +        }
    +
    +        for (Map.Entry<String, Integer> entry : supervisorCountMap.entrySet()) {
    +            String key = entry.getKey();
    +            int value = entry.getValue();
    +            if (value == windowSize) { // supervisor which was never back to normal in tolerance period will be removed from cache
    +                cachedSupervisors.remove(key);
    +                LOG.info("Supervisor {} was never back to normal during tolerance period, probably dead. Will remove from cache.", key);
    +            }
    +        }
    +
    +        for (Map.Entry<WorkerSlot, Integer> entry : slotCountMap.entrySet()) {
    +            WorkerSlot workerSlot = entry.getKey();
    +            String supervisorKey = workerSlot.getNodeId();
    +            Integer slot = workerSlot.getPort();
    +            int value = entry.getValue();
    +            if (value == windowSize) { // worker slot which was never back to normal in tolerance period will be removed from cache
    +                Set<Integer> slots = cachedSupervisors.get(supervisorKey);
    +                if (slots != null) { // slots will be null while supervisor has been removed from cached supervisors
    +                    slots.remove(slot);
    +                    cachedSupervisors.put(supervisorKey, slots);
    +                }
    +                LOG.info("Worker slot {} was never back to normal during tolerance period, probably dead. Will be removed from cache.", workerSlot);
    +            }
    +        }
    +    }
    +
    +    private Object initializeInstance(String className, String representation) {
    --- End diff --
    
    I think there is a function in Utils, or ServerUtils that can do this for us.


---

[GitHub] storm pull request #2343: STORM-2083 Blacklist scheduler

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2343#discussion_r141187231
  
    --- Diff: storm-server/pom.xml ---
    @@ -136,7 +136,7 @@
                     <artifactId>maven-checkstyle-plugin</artifactId>
                     <!--Note - the version would be inherited-->
                     <configuration>
    -                    <maxAllowedViolations>3626</maxAllowedViolations>
    +                    <maxAllowedViolations>4000</maxAllowedViolations>
    --- End diff --
    
    Can we try and fix the violations instead of boosting the number?


---