You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by revans2 <gi...@git.apache.org> on 2018/09/15 20:11:39 UTC

[GitHub] storm pull request #2836: STORM-3162: Cleanup heartbeats cache and make it t...

GitHub user revans2 opened a pull request:

    https://github.com/apache/storm/pull/2836

    STORM-3162: Cleanup heartbeats cache and make it thread safe

    This is an alternative to #2800 
    
    @zd-project I had such trouble really understanding what was happening with the heartbeat cache, because it was spread throughout so many files that I couldn't really be sure that #2800 would fix the issues or not.
    
    I decided that I needed to refactor the code to really understand how it worked, and provide a clean interface to the cache, which is what I did.  Once I had that it became clear to me that #2800 would fix the immediate issue, but there were other races in the code, especially with the caches for the executors.  This is probably not a big deal but I felt it would be best to just fix it too.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/revans2/incubator-storm STORM-3162

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/2836.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2836
    
----
commit 168a2075363be53d131f4ded41d29332ddce0f7b
Author: Robert (Bobby) Evans <ev...@...>
Date:   2018-09-14T19:48:33Z

    STORM-3162: Cleanup heartbeats cache and make it thread safe

----


---

[GitHub] storm pull request #2836: STORM-3162: Cleanup heartbeats cache and make it t...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2836#discussion_r218142320
  
    --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/HeartbeatCache.java ---
    @@ -0,0 +1,229 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.daemon.nimbus;
    +
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.function.Function;
    +import org.apache.storm.generated.Assignment;
    +import org.apache.storm.generated.ExecutorInfo;
    +import org.apache.storm.generated.SupervisorWorkerHeartbeat;
    +import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting;
    +import org.apache.storm.stats.ClientStatsUtil;
    +import org.apache.storm.stats.StatsUtil;
    +import org.apache.storm.utils.Time;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Holds a cache of heartbeats from the workers.
    + */
    +public class HeartbeatCache {
    +    private static final Logger LOG = LoggerFactory.getLogger(HeartbeatCache.class);
    +    private static final Function<String, ConcurrentHashMap<List<Integer>, ExecutorCache>> MAKE_MAP = (k) -> new ConcurrentHashMap<>();
    +
    +    private static class ExecutorCache {
    +        private Boolean isTimedOut;
    +        private Integer nimbusTime;
    +        private Integer executorReportedTime;
    +
    +        public ExecutorCache(Map<String, Object> newBeat) {
    +            if (newBeat != null) {
    +                executorReportedTime = (Integer) newBeat.getOrDefault(ClientStatsUtil.TIME_SECS, 0);
    +            } else {
    +                executorReportedTime = 0;
    +            }
    +
    +            nimbusTime = Time.currentTimeSecs();
    +            isTimedOut = false;
    +        }
    +
    +        public ExecutorCache(boolean isTimedOut, Integer nimbusTime, Integer executorReportedTime) {
    +            this.isTimedOut = isTimedOut;
    +            this.nimbusTime = nimbusTime;
    +            this.executorReportedTime = executorReportedTime;
    +        }
    +
    +        public synchronized Boolean isTimedOut() {
    +            return isTimedOut;
    +        }
    +
    +        public synchronized Integer getNimbusTime() {
    +            return nimbusTime;
    +        }
    +
    +        public synchronized Integer getExecutorReportedTime() {
    +            return executorReportedTime;
    +        }
    +
    +        public synchronized void updateTimeout(Integer timeout) {
    +            isTimedOut = Time.deltaSecs(getNimbusTime()) >= timeout;
    +        }
    +
    +        public synchronized void updateFromHb(Integer timeout, Map<String,Object> newBeat) {
    +            if (newBeat != null) {
    +                Integer newReportedTime = (Integer) newBeat.getOrDefault(ClientStatsUtil.TIME_SECS, 0);
    +                if (!newReportedTime.equals(executorReportedTime)) {
    +                    nimbusTime = Time.currentTimeSecs();
    +                }
    +                executorReportedTime = newReportedTime;
    +            }
    +            updateTimeout(timeout);
    +        }
    +    }
    +
    +    //Topology Id -> executor ids -> component -> stats(...)
    +    private final ConcurrentHashMap<String, ConcurrentHashMap<List<Integer>, ExecutorCache>> cache;
    +
    +    /**
    +     * Create an empty cache.
    +     */
    +    public HeartbeatCache() {
    +        this.cache = new ConcurrentHashMap<>();
    +    }
    +
    +    /**
    +     * Add an empty topology to the cache for testing purposes.
    +     * @param topoId the id of the topology to add.
    +     */
    +    @VisibleForTesting
    +    public void addEmptyTopoForTests(String topoId) {
    +        cache.put(topoId, new ConcurrentHashMap<>());
    +    }
    +
    +    /**
    +     * Get the number of topologies with cached heartbeats.
    +     * @return the number of topologies with cached heartbeats.
    +     */
    +    @VisibleForTesting
    +    public int getNumToposCached() {
    +        return cache.size();
    +    }
    +
    +    /**
    +     * Get the topology ids with cached heartbeats.
    +     * @return the set of topology ids with cached heartbeats.
    +     */
    +    @VisibleForTesting
    +    public Set<String> getTopologyIds() {
    +        return cache.keySet();
    +    }
    +
    +    /**
    +     * Remove a specific topology from the cache.
    +     * @param topoId the id of the topology to remove.
    +     */
    +    public void removeTopo(String topoId) {
    +        cache.remove(topoId);
    +    }
    +
    +    /**
    +     * Update the heartbeats for a topology with no heartbeats that came in.
    +     * @param topoId the id of the topology to look at.
    +     * @param taskTimeoutSecs the timeout to know if they are too old.
    +     */
    +    public void timeoutOldHeartbeats(String topoId, Integer taskTimeoutSecs) {
    +        Map<List<Integer>, ExecutorCache> topoCache = cache.computeIfAbsent(topoId, MAKE_MAP);
    +        //if not executor beats, refresh is-timed-out of the cache which is done by master
    +        for (ExecutorCache ec : topoCache.values()) {
    +            ec.updateTimeout(taskTimeoutSecs);
    +        }
    +    }
    +
    +    /**
    +     * Update the cache with heartbeats from a worker through zookeeper.
    +     * @param topoId the id to the topology.
    +     * @param executorBeats the HB data.
    +     * @param allExecutors the executors.
    +     * @param timeout the timeout.
    +     */
    +    public void updateFromZkHeartbeat(String topoId, Map<List<Integer>, Map<String,Object>> executorBeats,
    +                                      Set<List<Integer>> allExecutors, Integer timeout) {
    +        Map<List<Integer>, ExecutorCache> topoCache = cache.computeIfAbsent(topoId, MAKE_MAP);
    +        if (executorBeats == null) {
    +            executorBeats = new HashMap<>();
    +        }
    +
    +        for (List<Integer> executor : allExecutors) {
    +            final Map<String, Object> newBeat = executorBeats.get(executor);
    +            ExecutorCache currBeat = topoCache.computeIfAbsent(executor, (k) -> new ExecutorCache(newBeat));
    +            currBeat.updateFromHb(timeout, newBeat);
    +        }
    +    }
    +
    +    /**
    +     * Update the heartbeats for a given worker.
    +     * @param workerHeartbeat the heartbeats from the worker.
    +     * @param taskTimeoutSecs the timeout we should be looking at.
    +     */
    +    public void updateHeartbeat(SupervisorWorkerHeartbeat workerHeartbeat, Integer taskTimeoutSecs) {
    +        Map<List<Integer>, Map<String, Object>> executorBeats = StatsUtil.convertWorkerBeats(workerHeartbeat);
    +        String topoId = workerHeartbeat.get_storm_id();
    +        Map<List<Integer>, ExecutorCache> topoCache = cache.computeIfAbsent(topoId, MAKE_MAP);
    +
    +        for (ExecutorInfo executorInfo : workerHeartbeat.get_executors()) {
    +            List<Integer> executor = Arrays.asList(executorInfo.get_task_start(), executorInfo.get_task_end());
    +            final Map<String, Object> newBeat = executorBeats.get(executor);
    +            ExecutorCache currBeat = topoCache.computeIfAbsent(executor, (k) -> new ExecutorCache(newBeat));
    +            currBeat.updateFromHb(taskTimeoutSecs, newBeat);
    +        }
    +    }
    +
    +    /**
    +     * Get all of the alive executors for a given topology.
    +     * @param topoId the id of the topology we are looking for.
    +     * @param allExecutors all of the executors for this topology.
    +     * @param assignment the current topology assignment.
    +     * @param taskLaunchSecs timeout for right after a worker is launched.
    +     * @return the set of tasks that are alive.
    +     */
    +    public Set<List<Integer>> getAliveExecutors(String topoId, Set<List<Integer>> allExecutors, Assignment assignment, int taskLaunchSecs) {
    +        Map<List<Integer>, ExecutorCache> topoCache = cache.computeIfAbsent(topoId, MAKE_MAP);
    +        LOG.debug("Computing alive executors for {}\nExecutors: {}\nAssignment: {}\nHeartbeat cache: {}",
    +            topoId, allExecutors, assignment, topoCache);
    +
    +        Set<List<Integer>> ret = new HashSet<>();
    +        Map<List<Long>, Long> execToStartTimes = assignment.get_executor_start_time_secs();
    +
    +        for (List<Integer> exec : allExecutors) {
    +            List<Long> longExec = new ArrayList<>(exec.size());
    +            for (Integer num : exec) {
    +                longExec.add(num.longValue());
    +            }
    +
    +            Long startTime = execToStartTimes.get(longExec);
    +            ExecutorCache executorCache = topoCache.get(exec);
    +            //null isTimedOut means worker never reported any heartbeat
    +            Boolean isTimedOut = executorCache == null ? null : executorCache.isTimedOut();
    --- End diff --
    
    Nit: Would probably be simpler to set isTimedOut to false if executorCache is null. That avoids the extra null check below.


---

[GitHub] storm pull request #2836: STORM-3162: Cleanup heartbeats cache and make it t...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2836#discussion_r218121407
  
    --- Diff: storm-server/pom.xml ---
    @@ -171,7 +171,7 @@
                     <artifactId>maven-checkstyle-plugin</artifactId>
                     <!--Note - the version would be inherited-->
                     <configuration>
    -                    <maxAllowedViolations>780</maxAllowedViolations>
    +                    <maxAllowedViolations>853</maxAllowedViolations>
    --- End diff --
    
    This number seems to be moving in the wrong direction :)


---

[GitHub] storm pull request #2836: STORM-3162: Cleanup heartbeats cache and make it t...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2836#discussion_r218177587
  
    --- Diff: storm-server/pom.xml ---
    @@ -171,7 +171,7 @@
                     <artifactId>maven-checkstyle-plugin</artifactId>
                     <!--Note - the version would be inherited-->
                     <configuration>
    -                    <maxAllowedViolations>780</maxAllowedViolations>
    +                    <maxAllowedViolations>853</maxAllowedViolations>
    --- End diff --
    
    No, it's fine. 


---

[GitHub] storm issue #2836: STORM-3162: Cleanup heartbeats cache and make it thread s...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/2836
  
    Looks good, thanks. +1


---

[GitHub] storm pull request #2836: STORM-3162: Cleanup heartbeats cache and make it t...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2836#discussion_r218173878
  
    --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/HeartbeatCache.java ---
    @@ -0,0 +1,229 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.daemon.nimbus;
    +
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.function.Function;
    +import org.apache.storm.generated.Assignment;
    +import org.apache.storm.generated.ExecutorInfo;
    +import org.apache.storm.generated.SupervisorWorkerHeartbeat;
    +import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting;
    +import org.apache.storm.stats.ClientStatsUtil;
    +import org.apache.storm.stats.StatsUtil;
    +import org.apache.storm.utils.Time;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Holds a cache of heartbeats from the workers.
    + */
    +public class HeartbeatCache {
    +    private static final Logger LOG = LoggerFactory.getLogger(HeartbeatCache.class);
    +    private static final Function<String, ConcurrentHashMap<List<Integer>, ExecutorCache>> MAKE_MAP = (k) -> new ConcurrentHashMap<>();
    +
    +    private static class ExecutorCache {
    +        private Boolean isTimedOut;
    +        private Integer nimbusTime;
    +        private Integer executorReportedTime;
    +
    +        public ExecutorCache(Map<String, Object> newBeat) {
    +            if (newBeat != null) {
    +                executorReportedTime = (Integer) newBeat.getOrDefault(ClientStatsUtil.TIME_SECS, 0);
    +            } else {
    +                executorReportedTime = 0;
    +            }
    +
    +            nimbusTime = Time.currentTimeSecs();
    +            isTimedOut = false;
    +        }
    +
    +        public ExecutorCache(boolean isTimedOut, Integer nimbusTime, Integer executorReportedTime) {
    +            this.isTimedOut = isTimedOut;
    +            this.nimbusTime = nimbusTime;
    +            this.executorReportedTime = executorReportedTime;
    +        }
    +
    +        public synchronized Boolean isTimedOut() {
    +            return isTimedOut;
    +        }
    +
    +        public synchronized Integer getNimbusTime() {
    +            return nimbusTime;
    +        }
    +
    +        public synchronized Integer getExecutorReportedTime() {
    +            return executorReportedTime;
    +        }
    +
    +        public synchronized void updateTimeout(Integer timeout) {
    +            isTimedOut = Time.deltaSecs(getNimbusTime()) >= timeout;
    +        }
    +
    +        public synchronized void updateFromHb(Integer timeout, Map<String,Object> newBeat) {
    +            if (newBeat != null) {
    +                Integer newReportedTime = (Integer) newBeat.getOrDefault(ClientStatsUtil.TIME_SECS, 0);
    +                if (!newReportedTime.equals(executorReportedTime)) {
    +                    nimbusTime = Time.currentTimeSecs();
    +                }
    +                executorReportedTime = newReportedTime;
    +            }
    +            updateTimeout(timeout);
    +        }
    +    }
    +
    +    //Topology Id -> executor ids -> component -> stats(...)
    +    private final ConcurrentHashMap<String, ConcurrentHashMap<List<Integer>, ExecutorCache>> cache;
    +
    +    /**
    +     * Create an empty cache.
    +     */
    +    public HeartbeatCache() {
    +        this.cache = new ConcurrentHashMap<>();
    +    }
    +
    +    /**
    +     * Add an empty topology to the cache for testing purposes.
    +     * @param topoId the id of the topology to add.
    +     */
    +    @VisibleForTesting
    +    public void addEmptyTopoForTests(String topoId) {
    +        cache.put(topoId, new ConcurrentHashMap<>());
    +    }
    +
    +    /**
    +     * Get the number of topologies with cached heartbeats.
    +     * @return the number of topologies with cached heartbeats.
    +     */
    +    @VisibleForTesting
    +    public int getNumToposCached() {
    +        return cache.size();
    +    }
    +
    +    /**
    +     * Get the topology ids with cached heartbeats.
    +     * @return the set of topology ids with cached heartbeats.
    +     */
    +    @VisibleForTesting
    +    public Set<String> getTopologyIds() {
    +        return cache.keySet();
    +    }
    +
    +    /**
    +     * Remove a specific topology from the cache.
    +     * @param topoId the id of the topology to remove.
    +     */
    +    public void removeTopo(String topoId) {
    +        cache.remove(topoId);
    +    }
    +
    +    /**
    +     * Update the heartbeats for a topology with no heartbeats that came in.
    --- End diff --
    
    Yes it is very confusing.  I'll fix it.


---

[GitHub] storm pull request #2836: STORM-3162: Cleanup heartbeats cache and make it t...

Posted by kishorvpatil <gi...@git.apache.org>.
Github user kishorvpatil commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2836#discussion_r218086663
  
    --- Diff: storm-client/src/jvm/org/apache/storm/stats/ClientStatsUtil.java ---
    @@ -0,0 +1,202 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.stats;
    +
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import org.apache.storm.generated.ClusterWorkerHeartbeat;
    +import org.apache.storm.generated.ExecutorInfo;
    +import org.apache.storm.generated.ExecutorStats;
    +import org.apache.storm.generated.GlobalStreamId;
    +import org.apache.storm.shade.com.google.common.collect.Lists;
    +import org.apache.storm.utils.Time;
    +
    +/**
    + * Stats calculations needed by storm client code.
    + */
    +public class ClientStatsUtil {
    +    public static final String SPOUT = "spout";
    +    public static final String BOLT = "bolt";
    +    static final String EXECUTOR_STATS = "executor-stats";
    +    static final String UPTIME = "uptime";
    +    public static final String TIME_SECS = "time-secs";
    +    public static final ToGlobalStreamIdTransformer TO_GSID = new ToGlobalStreamIdTransformer();
    +    public static final IdentityTransformer IDENTITY = new IdentityTransformer();
    +
    +    /**
    +     * Convert a List<Long> executor to java List<Integer>.
    +     */
    +    public static List<Integer> convertExecutor(List<Long> executor) {
    +        return Lists.newArrayList(executor.get(0).intValue(), executor.get(1).intValue());
    +    }
    +
    +    /**
    +     * Make and map of executors to empty stats.
    +     * @param executors the executors as keys of the map.
    +     * @return and empty map of executors to stats.
    +     */
    +    public static Map<List<Integer>, ExecutorStats> mkEmptyExecutorZkHbs(Set<List<Long>> executors) {
    +        Map<List<Integer>, ExecutorStats> ret = new HashMap<>();
    +        for (Object executor : executors) {
    +            List startEnd = (List) executor;
    +            ret.put(convertExecutor(startEnd), null);
    +        }
    +        return ret;
    +    }
    +
    +    /**
    +     * Convert Long Executor Ids in ZkHbs to Integer ones structure to java maps.
    +     */
    +    public static Map<List<Integer>, ExecutorStats> convertExecutorZkHbs(Map<List<Long>, ExecutorStats> executorBeats) {
    +        Map<List<Integer>, ExecutorStats> ret = new HashMap<>();
    +        for (Map.Entry<List<Long>, ExecutorStats> entry : executorBeats.entrySet()) {
    +            ret.put(convertExecutor(entry.getKey()), entry.getValue());
    +        }
    +        return ret;
    +    }
    +
    +    /**
    +     * Create a new worker heartbeat for zookeeper.
    +     * @param topoId the topology id
    +     * @param executorStats the stats for the executors
    +     * @param uptime the uptime for the worker.
    +     * @return the heartbeat map.
    +     */
    +    public static Map<String, Object> mkZkWorkerHb(String topoId, Map<List<Integer>, ExecutorStats> executorStats, Integer uptime) {
    +        Map<String, Object> ret = new HashMap<>();
    +        ret.put("storm-id", topoId);
    +        ret.put(EXECUTOR_STATS, executorStats);
    +        ret.put(UPTIME, uptime);
    +        ret.put(TIME_SECS, Time.currentTimeSecs());
    +
    +        return ret;
    +    }
    +
    +    private static Number getByKeyOr0(Map<String, Object> m, String k) {
    +        if (m == null) {
    +            return 0;
    +        }
    +
    +        Number n = (Number) m.get(k);
    +        if (n == null) {
    +            return 0;
    +        }
    +        return n;
    +    }
    +
    +    /**
    +     * Get a sub-map by a given key.
    +     * @param map the original map
    +     * @param key the key to get it from.
    +     * @return the map stored under key.
    +     */
    +    public static <K, V> Map<K, V> getMapByKey(Map map, String key) {
    +        if (map == null) {
    +            return null;
    +        }
    +        return (Map<K, V>) map.get(key);
    +    }
    +
    +    public static ClusterWorkerHeartbeat thriftifyZkWorkerHb(Map<String, Object> heartbeat) {
    +        ClusterWorkerHeartbeat ret = new ClusterWorkerHeartbeat();
    +        ret.set_uptime_secs(getByKeyOr0(heartbeat, UPTIME).intValue());
    +        ret.set_storm_id((String) heartbeat.get("storm-id"));
    +        ret.set_time_secs(getByKeyOr0(heartbeat, TIME_SECS).intValue());
    +
    +        Map<ExecutorInfo, ExecutorStats> convertedStats = new HashMap<>();
    +
    +        Map<List<Integer>, ExecutorStats> executorStats = getMapByKey(heartbeat, EXECUTOR_STATS);
    +        if (executorStats != null) {
    +            for (Map.Entry<List<Integer>, ExecutorStats> entry : executorStats.entrySet()) {
    +                List<Integer> executor = entry.getKey();
    +                ExecutorStats stats = entry.getValue();
    +                if (null != stats) {
    +                    convertedStats.put(new ExecutorInfo(executor.get(0), executor.get(1)), stats);
    +                }
    +            }
    +        }
    +        ret.set_executor_stats(convertedStats);
    +
    +        return ret;
    +    }
    +
    +    /**
    +     * Converts stats to be over given windows of time.
    +     * @param stats the stats
    +     * @param secKeyFunc transform the sub-key
    +     * @param firstKeyFunc transform the main key
    +     */
    +    public static <K1, K2> Map windowSetConverter(
    +        Map stats, KeyTransformer<K2> secKeyFunc, KeyTransformer<K1> firstKeyFunc) {
    +        Map ret = new HashMap();
    +
    +        for (Object o : stats.entrySet()) {
    +            Map.Entry entry = (Map.Entry) o;
    +            K1 key1 = firstKeyFunc.transform(entry.getKey());
    +
    +            Map subRetMap = (Map) ret.get(key1);
    +            if (subRetMap == null) {
    +                subRetMap = new HashMap();
    +            }
    +            ret.put(key1, subRetMap);
    +
    +            Map value = (Map) entry.getValue();
    +            for (Object oo : value.entrySet()) {
    +                Map.Entry subEntry = (Map.Entry) oo;
    +                K2 key2 = secKeyFunc.transform(subEntry.getKey());
    +                subRetMap.put(key2, subEntry.getValue());
    +            }
    +        }
    +        return ret;
    +    }
    +
    +    // =====================================================================================
    +    // key transformers
    +    // =====================================================================================
    +
    +    /**
    +     * Provides a way to transform one key into another.
    +     * @param <T>
    +     */
    +    interface KeyTransformer<T> {
    +        T transform(Object key);
    +    }
    +
    +    static class ToGlobalStreamIdTransformer implements KeyTransformer<GlobalStreamId> {
    +        @Override
    +        public GlobalStreamId transform(Object key) {
    +            if (key instanceof List) {
    +                List l = (List) key;
    +                if (l.size() > 1) {
    +                    return new GlobalStreamId((String) l.get(0), (String) l.get(1));
    +                }
    +            }
    +            return new GlobalStreamId("", key.toString());
    --- End diff --
    
    Should this be marked to _DEFAULT_STREAM_ID_ or is this expected to be ignored?


---

[GitHub] storm pull request #2836: STORM-3162: Cleanup heartbeats cache and make it t...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2836#discussion_r218174140
  
    --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
    @@ -4624,7 +4569,7 @@ public boolean isRemoteBlobExists(String blobKey) throws AuthorizationException,
             return true;
         }
     
    -    private static final class Assoc<K, V> implements UnaryOperator<Map<K, V>> {
    +    static final class Assoc<K, V> implements UnaryOperator<Map<K, V>> {
    --- End diff --
    
    Yes, I'll make them private again.  This was because as I was refactoring I kept the Assoc and  Dissoc until the very end when I fixed the threading.


---

[GitHub] storm pull request #2836: STORM-3162: Cleanup heartbeats cache and make it t...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2836#discussion_r218088968
  
    --- Diff: storm-client/src/jvm/org/apache/storm/stats/ClientStatsUtil.java ---
    @@ -0,0 +1,202 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.stats;
    +
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import org.apache.storm.generated.ClusterWorkerHeartbeat;
    +import org.apache.storm.generated.ExecutorInfo;
    +import org.apache.storm.generated.ExecutorStats;
    +import org.apache.storm.generated.GlobalStreamId;
    +import org.apache.storm.shade.com.google.common.collect.Lists;
    +import org.apache.storm.utils.Time;
    +
    +/**
    + * Stats calculations needed by storm client code.
    + */
    +public class ClientStatsUtil {
    +    public static final String SPOUT = "spout";
    +    public static final String BOLT = "bolt";
    +    static final String EXECUTOR_STATS = "executor-stats";
    +    static final String UPTIME = "uptime";
    +    public static final String TIME_SECS = "time-secs";
    +    public static final ToGlobalStreamIdTransformer TO_GSID = new ToGlobalStreamIdTransformer();
    +    public static final IdentityTransformer IDENTITY = new IdentityTransformer();
    +
    +    /**
    +     * Convert a List<Long> executor to java List<Integer>.
    +     */
    +    public static List<Integer> convertExecutor(List<Long> executor) {
    +        return Lists.newArrayList(executor.get(0).intValue(), executor.get(1).intValue());
    +    }
    +
    +    /**
    +     * Make and map of executors to empty stats.
    +     * @param executors the executors as keys of the map.
    +     * @return and empty map of executors to stats.
    +     */
    +    public static Map<List<Integer>, ExecutorStats> mkEmptyExecutorZkHbs(Set<List<Long>> executors) {
    +        Map<List<Integer>, ExecutorStats> ret = new HashMap<>();
    +        for (Object executor : executors) {
    +            List startEnd = (List) executor;
    +            ret.put(convertExecutor(startEnd), null);
    +        }
    +        return ret;
    +    }
    +
    +    /**
    +     * Convert Long Executor Ids in ZkHbs to Integer ones structure to java maps.
    +     */
    +    public static Map<List<Integer>, ExecutorStats> convertExecutorZkHbs(Map<List<Long>, ExecutorStats> executorBeats) {
    +        Map<List<Integer>, ExecutorStats> ret = new HashMap<>();
    +        for (Map.Entry<List<Long>, ExecutorStats> entry : executorBeats.entrySet()) {
    +            ret.put(convertExecutor(entry.getKey()), entry.getValue());
    +        }
    +        return ret;
    +    }
    +
    +    /**
    +     * Create a new worker heartbeat for zookeeper.
    +     * @param topoId the topology id
    +     * @param executorStats the stats for the executors
    +     * @param uptime the uptime for the worker.
    +     * @return the heartbeat map.
    +     */
    +    public static Map<String, Object> mkZkWorkerHb(String topoId, Map<List<Integer>, ExecutorStats> executorStats, Integer uptime) {
    +        Map<String, Object> ret = new HashMap<>();
    +        ret.put("storm-id", topoId);
    +        ret.put(EXECUTOR_STATS, executorStats);
    +        ret.put(UPTIME, uptime);
    +        ret.put(TIME_SECS, Time.currentTimeSecs());
    +
    +        return ret;
    +    }
    +
    +    private static Number getByKeyOr0(Map<String, Object> m, String k) {
    +        if (m == null) {
    +            return 0;
    +        }
    +
    +        Number n = (Number) m.get(k);
    +        if (n == null) {
    +            return 0;
    +        }
    +        return n;
    +    }
    +
    +    /**
    +     * Get a sub-map by a given key.
    +     * @param map the original map
    +     * @param key the key to get it from.
    +     * @return the map stored under key.
    +     */
    +    public static <K, V> Map<K, V> getMapByKey(Map map, String key) {
    +        if (map == null) {
    +            return null;
    +        }
    +        return (Map<K, V>) map.get(key);
    +    }
    +
    +    public static ClusterWorkerHeartbeat thriftifyZkWorkerHb(Map<String, Object> heartbeat) {
    +        ClusterWorkerHeartbeat ret = new ClusterWorkerHeartbeat();
    +        ret.set_uptime_secs(getByKeyOr0(heartbeat, UPTIME).intValue());
    +        ret.set_storm_id((String) heartbeat.get("storm-id"));
    +        ret.set_time_secs(getByKeyOr0(heartbeat, TIME_SECS).intValue());
    +
    +        Map<ExecutorInfo, ExecutorStats> convertedStats = new HashMap<>();
    +
    +        Map<List<Integer>, ExecutorStats> executorStats = getMapByKey(heartbeat, EXECUTOR_STATS);
    +        if (executorStats != null) {
    +            for (Map.Entry<List<Integer>, ExecutorStats> entry : executorStats.entrySet()) {
    +                List<Integer> executor = entry.getKey();
    +                ExecutorStats stats = entry.getValue();
    +                if (null != stats) {
    +                    convertedStats.put(new ExecutorInfo(executor.get(0), executor.get(1)), stats);
    +                }
    +            }
    +        }
    +        ret.set_executor_stats(convertedStats);
    +
    +        return ret;
    +    }
    +
    +    /**
    +     * Converts stats to be over given windows of time.
    +     * @param stats the stats
    +     * @param secKeyFunc transform the sub-key
    +     * @param firstKeyFunc transform the main key
    +     */
    +    public static <K1, K2> Map windowSetConverter(
    +        Map stats, KeyTransformer<K2> secKeyFunc, KeyTransformer<K1> firstKeyFunc) {
    +        Map ret = new HashMap();
    +
    +        for (Object o : stats.entrySet()) {
    +            Map.Entry entry = (Map.Entry) o;
    +            K1 key1 = firstKeyFunc.transform(entry.getKey());
    +
    +            Map subRetMap = (Map) ret.get(key1);
    +            if (subRetMap == null) {
    +                subRetMap = new HashMap();
    +            }
    +            ret.put(key1, subRetMap);
    +
    +            Map value = (Map) entry.getValue();
    +            for (Object oo : value.entrySet()) {
    +                Map.Entry subEntry = (Map.Entry) oo;
    +                K2 key2 = secKeyFunc.transform(subEntry.getKey());
    +                subRetMap.put(key2, subEntry.getValue());
    +            }
    +        }
    +        return ret;
    +    }
    +
    +    // =====================================================================================
    +    // key transformers
    +    // =====================================================================================
    +
    +    /**
    +     * Provides a way to transform one key into another.
    +     * @param <T>
    +     */
    +    interface KeyTransformer<T> {
    +        T transform(Object key);
    +    }
    +
    +    static class ToGlobalStreamIdTransformer implements KeyTransformer<GlobalStreamId> {
    +        @Override
    +        public GlobalStreamId transform(Object key) {
    +            if (key instanceof List) {
    +                List l = (List) key;
    +                if (l.size() > 1) {
    +                    return new GlobalStreamId((String) l.get(0), (String) l.get(1));
    +                }
    +            }
    +            return new GlobalStreamId("", key.toString());
    --- End diff --
    
    This code was just moved from one class StatsUtil.java to another ClientStatsUtil.java.  If there is a bug here, it was there before, and we address it in a follow on JIRA.
    
    https://github.com/apache/storm/blob/4605ae0a34858309171e726e1924b9a37695c977/storm-client/src/jvm/org/apache/storm/stats/StatsUtil.java#L2585


---

[GitHub] storm pull request #2836: STORM-3162: Cleanup heartbeats cache and make it t...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/storm/pull/2836


---

[GitHub] storm pull request #2836: STORM-3162: Cleanup heartbeats cache and make it t...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2836#discussion_r218124022
  
    --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/HeartbeatCache.java ---
    @@ -0,0 +1,229 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.daemon.nimbus;
    +
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.function.Function;
    +import org.apache.storm.generated.Assignment;
    +import org.apache.storm.generated.ExecutorInfo;
    +import org.apache.storm.generated.SupervisorWorkerHeartbeat;
    +import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting;
    +import org.apache.storm.stats.ClientStatsUtil;
    +import org.apache.storm.stats.StatsUtil;
    +import org.apache.storm.utils.Time;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Holds a cache of heartbeats from the workers.
    + */
    +public class HeartbeatCache {
    +    private static final Logger LOG = LoggerFactory.getLogger(HeartbeatCache.class);
    +    private static final Function<String, ConcurrentHashMap<List<Integer>, ExecutorCache>> MAKE_MAP = (k) -> new ConcurrentHashMap<>();
    +
    +    private static class ExecutorCache {
    +        private Boolean isTimedOut;
    +        private Integer nimbusTime;
    +        private Integer executorReportedTime;
    +
    +        public ExecutorCache(Map<String, Object> newBeat) {
    +            if (newBeat != null) {
    +                executorReportedTime = (Integer) newBeat.getOrDefault(ClientStatsUtil.TIME_SECS, 0);
    +            } else {
    +                executorReportedTime = 0;
    +            }
    +
    +            nimbusTime = Time.currentTimeSecs();
    +            isTimedOut = false;
    +        }
    +
    +        public ExecutorCache(boolean isTimedOut, Integer nimbusTime, Integer executorReportedTime) {
    +            this.isTimedOut = isTimedOut;
    +            this.nimbusTime = nimbusTime;
    +            this.executorReportedTime = executorReportedTime;
    +        }
    +
    +        public synchronized Boolean isTimedOut() {
    +            return isTimedOut;
    +        }
    +
    +        public synchronized Integer getNimbusTime() {
    +            return nimbusTime;
    +        }
    +
    +        public synchronized Integer getExecutorReportedTime() {
    +            return executorReportedTime;
    +        }
    +
    +        public synchronized void updateTimeout(Integer timeout) {
    +            isTimedOut = Time.deltaSecs(getNimbusTime()) >= timeout;
    +        }
    +
    +        public synchronized void updateFromHb(Integer timeout, Map<String,Object> newBeat) {
    +            if (newBeat != null) {
    +                Integer newReportedTime = (Integer) newBeat.getOrDefault(ClientStatsUtil.TIME_SECS, 0);
    +                if (!newReportedTime.equals(executorReportedTime)) {
    +                    nimbusTime = Time.currentTimeSecs();
    +                }
    +                executorReportedTime = newReportedTime;
    +            }
    +            updateTimeout(timeout);
    +        }
    +    }
    +
    +    //Topology Id -> executor ids -> component -> stats(...)
    +    private final ConcurrentHashMap<String, ConcurrentHashMap<List<Integer>, ExecutorCache>> cache;
    +
    +    /**
    +     * Create an empty cache.
    +     */
    +    public HeartbeatCache() {
    +        this.cache = new ConcurrentHashMap<>();
    +    }
    +
    +    /**
    +     * Add an empty topology to the cache for testing purposes.
    +     * @param topoId the id of the topology to add.
    +     */
    +    @VisibleForTesting
    +    public void addEmptyTopoForTests(String topoId) {
    +        cache.put(topoId, new ConcurrentHashMap<>());
    +    }
    +
    +    /**
    +     * Get the number of topologies with cached heartbeats.
    +     * @return the number of topologies with cached heartbeats.
    +     */
    +    @VisibleForTesting
    +    public int getNumToposCached() {
    +        return cache.size();
    +    }
    +
    +    /**
    +     * Get the topology ids with cached heartbeats.
    +     * @return the set of topology ids with cached heartbeats.
    +     */
    +    @VisibleForTesting
    +    public Set<String> getTopologyIds() {
    +        return cache.keySet();
    +    }
    +
    +    /**
    +     * Remove a specific topology from the cache.
    +     * @param topoId the id of the topology to remove.
    +     */
    +    public void removeTopo(String topoId) {
    +        cache.remove(topoId);
    +    }
    +
    +    /**
    +     * Update the heartbeats for a topology with no heartbeats that came in.
    +     * @param topoId the id of the topology to look at.
    +     * @param taskTimeoutSecs the timeout to know if they are too old.
    +     */
    +    public void timeoutOldHeartbeats(String topoId, Integer taskTimeoutSecs) {
    +        Map<List<Integer>, ExecutorCache> topoCache = cache.computeIfAbsent(topoId, MAKE_MAP);
    +        //if not executor beats, refresh is-timed-out of the cache which is done by master
    --- End diff --
    
    Does this comment still make sense?


---

[GitHub] storm pull request #2836: STORM-3162: Cleanup heartbeats cache and make it t...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2836#discussion_r218173746
  
    --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/HeartbeatCache.java ---
    @@ -0,0 +1,229 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.daemon.nimbus;
    +
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.function.Function;
    +import org.apache.storm.generated.Assignment;
    +import org.apache.storm.generated.ExecutorInfo;
    +import org.apache.storm.generated.SupervisorWorkerHeartbeat;
    +import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting;
    +import org.apache.storm.stats.ClientStatsUtil;
    +import org.apache.storm.stats.StatsUtil;
    +import org.apache.storm.utils.Time;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Holds a cache of heartbeats from the workers.
    + */
    +public class HeartbeatCache {
    +    private static final Logger LOG = LoggerFactory.getLogger(HeartbeatCache.class);
    +    private static final Function<String, ConcurrentHashMap<List<Integer>, ExecutorCache>> MAKE_MAP = (k) -> new ConcurrentHashMap<>();
    +
    +    private static class ExecutorCache {
    +        private Boolean isTimedOut;
    +        private Integer nimbusTime;
    +        private Integer executorReportedTime;
    +
    +        public ExecutorCache(Map<String, Object> newBeat) {
    +            if (newBeat != null) {
    +                executorReportedTime = (Integer) newBeat.getOrDefault(ClientStatsUtil.TIME_SECS, 0);
    +            } else {
    +                executorReportedTime = 0;
    +            }
    +
    +            nimbusTime = Time.currentTimeSecs();
    +            isTimedOut = false;
    +        }
    +
    +        public ExecutorCache(boolean isTimedOut, Integer nimbusTime, Integer executorReportedTime) {
    +            this.isTimedOut = isTimedOut;
    +            this.nimbusTime = nimbusTime;
    +            this.executorReportedTime = executorReportedTime;
    +        }
    +
    +        public synchronized Boolean isTimedOut() {
    +            return isTimedOut;
    +        }
    +
    +        public synchronized Integer getNimbusTime() {
    +            return nimbusTime;
    +        }
    +
    +        public synchronized Integer getExecutorReportedTime() {
    +            return executorReportedTime;
    +        }
    +
    +        public synchronized void updateTimeout(Integer timeout) {
    +            isTimedOut = Time.deltaSecs(getNimbusTime()) >= timeout;
    +        }
    +
    +        public synchronized void updateFromHb(Integer timeout, Map<String,Object> newBeat) {
    +            if (newBeat != null) {
    +                Integer newReportedTime = (Integer) newBeat.getOrDefault(ClientStatsUtil.TIME_SECS, 0);
    +                if (!newReportedTime.equals(executorReportedTime)) {
    +                    nimbusTime = Time.currentTimeSecs();
    +                }
    +                executorReportedTime = newReportedTime;
    +            }
    +            updateTimeout(timeout);
    +        }
    +    }
    +
    +    //Topology Id -> executor ids -> component -> stats(...)
    +    private final ConcurrentHashMap<String, ConcurrentHashMap<List<Integer>, ExecutorCache>> cache;
    +
    +    /**
    +     * Create an empty cache.
    +     */
    +    public HeartbeatCache() {
    +        this.cache = new ConcurrentHashMap<>();
    +    }
    +
    +    /**
    +     * Add an empty topology to the cache for testing purposes.
    +     * @param topoId the id of the topology to add.
    +     */
    +    @VisibleForTesting
    +    public void addEmptyTopoForTests(String topoId) {
    +        cache.put(topoId, new ConcurrentHashMap<>());
    +    }
    +
    +    /**
    +     * Get the number of topologies with cached heartbeats.
    +     * @return the number of topologies with cached heartbeats.
    +     */
    +    @VisibleForTesting
    +    public int getNumToposCached() {
    +        return cache.size();
    +    }
    +
    +    /**
    +     * Get the topology ids with cached heartbeats.
    +     * @return the set of topology ids with cached heartbeats.
    +     */
    +    @VisibleForTesting
    +    public Set<String> getTopologyIds() {
    +        return cache.keySet();
    +    }
    +
    +    /**
    +     * Remove a specific topology from the cache.
    +     * @param topoId the id of the topology to remove.
    +     */
    +    public void removeTopo(String topoId) {
    +        cache.remove(topoId);
    +    }
    +
    +    /**
    +     * Update the heartbeats for a topology with no heartbeats that came in.
    +     * @param topoId the id of the topology to look at.
    +     * @param taskTimeoutSecs the timeout to know if they are too old.
    +     */
    +    public void timeoutOldHeartbeats(String topoId, Integer taskTimeoutSecs) {
    +        Map<List<Integer>, ExecutorCache> topoCache = cache.computeIfAbsent(topoId, MAKE_MAP);
    +        //if not executor beats, refresh is-timed-out of the cache which is done by master
    --- End diff --
    
    The comment came from the original code as I was doing refactoring.  I'll make it better.


---

[GitHub] storm issue #2836: STORM-3162: Cleanup heartbeats cache and make it thread s...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the issue:

    https://github.com/apache/storm/pull/2836
  
    @srdo I think I addressed all of your review comments.


---

[GitHub] storm pull request #2836: STORM-3162: Cleanup heartbeats cache and make it t...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2836#discussion_r218173536
  
    --- Diff: storm-server/pom.xml ---
    @@ -171,7 +171,7 @@
                     <artifactId>maven-checkstyle-plugin</artifactId>
                     <!--Note - the version would be inherited-->
                     <configuration>
    -                    <maxAllowedViolations>780</maxAllowedViolations>
    +                    <maxAllowedViolations>853</maxAllowedViolations>
    --- End diff --
    
    Yes it is all the issues that came with StatsUtil, which is why the other one went in a good direction.  I can try and clean them up more if you want.


---

[GitHub] storm pull request #2836: STORM-3162: Cleanup heartbeats cache and make it t...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2836#discussion_r218140867
  
    --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
    @@ -4624,7 +4569,7 @@ public boolean isRemoteBlobExists(String blobKey) throws AuthorizationException,
             return true;
         }
     
    -    private static final class Assoc<K, V> implements UnaryOperator<Map<K, V>> {
    +    static final class Assoc<K, V> implements UnaryOperator<Map<K, V>> {
    --- End diff --
    
    Assoc and Dissoc don't seem to be used in new places, so is making them package private still necessary?


---

[GitHub] storm pull request #2836: STORM-3162: Cleanup heartbeats cache and make it t...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2836#discussion_r217911059
  
    --- Diff: storm-client/src/jvm/org/apache/storm/stats/ClientStatsUtil.java ---
    @@ -0,0 +1,202 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.stats;
    +
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import org.apache.storm.generated.ClusterWorkerHeartbeat;
    +import org.apache.storm.generated.ExecutorInfo;
    +import org.apache.storm.generated.ExecutorStats;
    +import org.apache.storm.generated.GlobalStreamId;
    +import org.apache.storm.shade.com.google.common.collect.Lists;
    +import org.apache.storm.utils.Time;
    +
    +/**
    + * Stats calculations needed by storm client code.
    + */
    +public class ClientStatsUtil {
    +    public static final String SPOUT = "spout";
    +    public static final String BOLT = "bolt";
    +    static final String EXECUTOR_STATS = "executor-stats";
    +    static final String UPTIME = "uptime";
    +    public static final String TIME_SECS = "time-secs";
    +    public static final ToGlobalStreamIdTransformer TO_GSID = new ToGlobalStreamIdTransformer();
    +    public static final IdentityTransformer IDENTITY = new IdentityTransformer();
    +
    +    /**
    +     * Convert a List<Long> executor to java List<Integer>.
    +     */
    +    public static List<Integer> convertExecutor(List<Long> executor) {
    +        return Lists.newArrayList(executor.get(0).intValue(), executor.get(1).intValue());
    +    }
    +
    +    /**
    +     * Make and map of executors to empty stats.
    --- End diff --
    
    Nit: Couple of places saying "and" where it should say "a"/"an"


---

[GitHub] storm pull request #2836: STORM-3162: Cleanup heartbeats cache and make it t...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2836#discussion_r218140315
  
    --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/HeartbeatCache.java ---
    @@ -0,0 +1,229 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.daemon.nimbus;
    +
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.function.Function;
    +import org.apache.storm.generated.Assignment;
    +import org.apache.storm.generated.ExecutorInfo;
    +import org.apache.storm.generated.SupervisorWorkerHeartbeat;
    +import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting;
    +import org.apache.storm.stats.ClientStatsUtil;
    +import org.apache.storm.stats.StatsUtil;
    +import org.apache.storm.utils.Time;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Holds a cache of heartbeats from the workers.
    + */
    +public class HeartbeatCache {
    +    private static final Logger LOG = LoggerFactory.getLogger(HeartbeatCache.class);
    +    private static final Function<String, ConcurrentHashMap<List<Integer>, ExecutorCache>> MAKE_MAP = (k) -> new ConcurrentHashMap<>();
    +
    +    private static class ExecutorCache {
    +        private Boolean isTimedOut;
    +        private Integer nimbusTime;
    +        private Integer executorReportedTime;
    +
    +        public ExecutorCache(Map<String, Object> newBeat) {
    +            if (newBeat != null) {
    +                executorReportedTime = (Integer) newBeat.getOrDefault(ClientStatsUtil.TIME_SECS, 0);
    +            } else {
    +                executorReportedTime = 0;
    +            }
    +
    +            nimbusTime = Time.currentTimeSecs();
    +            isTimedOut = false;
    +        }
    +
    +        public ExecutorCache(boolean isTimedOut, Integer nimbusTime, Integer executorReportedTime) {
    +            this.isTimedOut = isTimedOut;
    +            this.nimbusTime = nimbusTime;
    +            this.executorReportedTime = executorReportedTime;
    +        }
    +
    +        public synchronized Boolean isTimedOut() {
    +            return isTimedOut;
    +        }
    +
    +        public synchronized Integer getNimbusTime() {
    +            return nimbusTime;
    +        }
    +
    +        public synchronized Integer getExecutorReportedTime() {
    +            return executorReportedTime;
    +        }
    +
    +        public synchronized void updateTimeout(Integer timeout) {
    +            isTimedOut = Time.deltaSecs(getNimbusTime()) >= timeout;
    +        }
    +
    +        public synchronized void updateFromHb(Integer timeout, Map<String,Object> newBeat) {
    +            if (newBeat != null) {
    +                Integer newReportedTime = (Integer) newBeat.getOrDefault(ClientStatsUtil.TIME_SECS, 0);
    +                if (!newReportedTime.equals(executorReportedTime)) {
    +                    nimbusTime = Time.currentTimeSecs();
    +                }
    +                executorReportedTime = newReportedTime;
    +            }
    +            updateTimeout(timeout);
    +        }
    +    }
    +
    +    //Topology Id -> executor ids -> component -> stats(...)
    +    private final ConcurrentHashMap<String, ConcurrentHashMap<List<Integer>, ExecutorCache>> cache;
    +
    +    /**
    +     * Create an empty cache.
    +     */
    +    public HeartbeatCache() {
    +        this.cache = new ConcurrentHashMap<>();
    +    }
    +
    +    /**
    +     * Add an empty topology to the cache for testing purposes.
    +     * @param topoId the id of the topology to add.
    +     */
    +    @VisibleForTesting
    +    public void addEmptyTopoForTests(String topoId) {
    +        cache.put(topoId, new ConcurrentHashMap<>());
    +    }
    +
    +    /**
    +     * Get the number of topologies with cached heartbeats.
    +     * @return the number of topologies with cached heartbeats.
    +     */
    +    @VisibleForTesting
    +    public int getNumToposCached() {
    +        return cache.size();
    +    }
    +
    +    /**
    +     * Get the topology ids with cached heartbeats.
    +     * @return the set of topology ids with cached heartbeats.
    +     */
    +    @VisibleForTesting
    +    public Set<String> getTopologyIds() {
    +        return cache.keySet();
    +    }
    +
    +    /**
    +     * Remove a specific topology from the cache.
    +     * @param topoId the id of the topology to remove.
    +     */
    +    public void removeTopo(String topoId) {
    +        cache.remove(topoId);
    +    }
    +
    +    /**
    +     * Update the heartbeats for a topology with no heartbeats that came in.
    --- End diff --
    
    I don't understand what this is saying.


---

[GitHub] storm pull request #2836: STORM-3162: Cleanup heartbeats cache and make it t...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2836#discussion_r218174414
  
    --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/HeartbeatCache.java ---
    @@ -0,0 +1,229 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.daemon.nimbus;
    +
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.function.Function;
    +import org.apache.storm.generated.Assignment;
    +import org.apache.storm.generated.ExecutorInfo;
    +import org.apache.storm.generated.SupervisorWorkerHeartbeat;
    +import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting;
    +import org.apache.storm.stats.ClientStatsUtil;
    +import org.apache.storm.stats.StatsUtil;
    +import org.apache.storm.utils.Time;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Holds a cache of heartbeats from the workers.
    + */
    +public class HeartbeatCache {
    +    private static final Logger LOG = LoggerFactory.getLogger(HeartbeatCache.class);
    +    private static final Function<String, ConcurrentHashMap<List<Integer>, ExecutorCache>> MAKE_MAP = (k) -> new ConcurrentHashMap<>();
    +
    +    private static class ExecutorCache {
    +        private Boolean isTimedOut;
    +        private Integer nimbusTime;
    +        private Integer executorReportedTime;
    +
    +        public ExecutorCache(Map<String, Object> newBeat) {
    +            if (newBeat != null) {
    +                executorReportedTime = (Integer) newBeat.getOrDefault(ClientStatsUtil.TIME_SECS, 0);
    +            } else {
    +                executorReportedTime = 0;
    +            }
    +
    +            nimbusTime = Time.currentTimeSecs();
    +            isTimedOut = false;
    +        }
    +
    +        public ExecutorCache(boolean isTimedOut, Integer nimbusTime, Integer executorReportedTime) {
    +            this.isTimedOut = isTimedOut;
    +            this.nimbusTime = nimbusTime;
    +            this.executorReportedTime = executorReportedTime;
    +        }
    +
    +        public synchronized Boolean isTimedOut() {
    +            return isTimedOut;
    +        }
    +
    +        public synchronized Integer getNimbusTime() {
    +            return nimbusTime;
    +        }
    +
    +        public synchronized Integer getExecutorReportedTime() {
    +            return executorReportedTime;
    +        }
    +
    +        public synchronized void updateTimeout(Integer timeout) {
    +            isTimedOut = Time.deltaSecs(getNimbusTime()) >= timeout;
    +        }
    +
    +        public synchronized void updateFromHb(Integer timeout, Map<String,Object> newBeat) {
    +            if (newBeat != null) {
    +                Integer newReportedTime = (Integer) newBeat.getOrDefault(ClientStatsUtil.TIME_SECS, 0);
    +                if (!newReportedTime.equals(executorReportedTime)) {
    +                    nimbusTime = Time.currentTimeSecs();
    +                }
    +                executorReportedTime = newReportedTime;
    +            }
    +            updateTimeout(timeout);
    +        }
    +    }
    +
    +    //Topology Id -> executor ids -> component -> stats(...)
    +    private final ConcurrentHashMap<String, ConcurrentHashMap<List<Integer>, ExecutorCache>> cache;
    +
    +    /**
    +     * Create an empty cache.
    +     */
    +    public HeartbeatCache() {
    +        this.cache = new ConcurrentHashMap<>();
    +    }
    +
    +    /**
    +     * Add an empty topology to the cache for testing purposes.
    +     * @param topoId the id of the topology to add.
    +     */
    +    @VisibleForTesting
    +    public void addEmptyTopoForTests(String topoId) {
    +        cache.put(topoId, new ConcurrentHashMap<>());
    +    }
    +
    +    /**
    +     * Get the number of topologies with cached heartbeats.
    +     * @return the number of topologies with cached heartbeats.
    +     */
    +    @VisibleForTesting
    +    public int getNumToposCached() {
    +        return cache.size();
    +    }
    +
    +    /**
    +     * Get the topology ids with cached heartbeats.
    +     * @return the set of topology ids with cached heartbeats.
    +     */
    +    @VisibleForTesting
    +    public Set<String> getTopologyIds() {
    +        return cache.keySet();
    +    }
    +
    +    /**
    +     * Remove a specific topology from the cache.
    +     * @param topoId the id of the topology to remove.
    +     */
    +    public void removeTopo(String topoId) {
    +        cache.remove(topoId);
    +    }
    +
    +    /**
    +     * Update the heartbeats for a topology with no heartbeats that came in.
    +     * @param topoId the id of the topology to look at.
    +     * @param taskTimeoutSecs the timeout to know if they are too old.
    +     */
    +    public void timeoutOldHeartbeats(String topoId, Integer taskTimeoutSecs) {
    +        Map<List<Integer>, ExecutorCache> topoCache = cache.computeIfAbsent(topoId, MAKE_MAP);
    +        //if not executor beats, refresh is-timed-out of the cache which is done by master
    +        for (ExecutorCache ec : topoCache.values()) {
    +            ec.updateTimeout(taskTimeoutSecs);
    +        }
    +    }
    +
    +    /**
    +     * Update the cache with heartbeats from a worker through zookeeper.
    +     * @param topoId the id to the topology.
    +     * @param executorBeats the HB data.
    +     * @param allExecutors the executors.
    +     * @param timeout the timeout.
    +     */
    +    public void updateFromZkHeartbeat(String topoId, Map<List<Integer>, Map<String,Object>> executorBeats,
    +                                      Set<List<Integer>> allExecutors, Integer timeout) {
    +        Map<List<Integer>, ExecutorCache> topoCache = cache.computeIfAbsent(topoId, MAKE_MAP);
    +        if (executorBeats == null) {
    +            executorBeats = new HashMap<>();
    +        }
    +
    +        for (List<Integer> executor : allExecutors) {
    +            final Map<String, Object> newBeat = executorBeats.get(executor);
    +            ExecutorCache currBeat = topoCache.computeIfAbsent(executor, (k) -> new ExecutorCache(newBeat));
    +            currBeat.updateFromHb(timeout, newBeat);
    +        }
    +    }
    +
    +    /**
    +     * Update the heartbeats for a given worker.
    +     * @param workerHeartbeat the heartbeats from the worker.
    +     * @param taskTimeoutSecs the timeout we should be looking at.
    +     */
    +    public void updateHeartbeat(SupervisorWorkerHeartbeat workerHeartbeat, Integer taskTimeoutSecs) {
    +        Map<List<Integer>, Map<String, Object>> executorBeats = StatsUtil.convertWorkerBeats(workerHeartbeat);
    +        String topoId = workerHeartbeat.get_storm_id();
    +        Map<List<Integer>, ExecutorCache> topoCache = cache.computeIfAbsent(topoId, MAKE_MAP);
    +
    +        for (ExecutorInfo executorInfo : workerHeartbeat.get_executors()) {
    +            List<Integer> executor = Arrays.asList(executorInfo.get_task_start(), executorInfo.get_task_end());
    +            final Map<String, Object> newBeat = executorBeats.get(executor);
    +            ExecutorCache currBeat = topoCache.computeIfAbsent(executor, (k) -> new ExecutorCache(newBeat));
    +            currBeat.updateFromHb(taskTimeoutSecs, newBeat);
    +        }
    +    }
    +
    +    /**
    +     * Get all of the alive executors for a given topology.
    +     * @param topoId the id of the topology we are looking for.
    +     * @param allExecutors all of the executors for this topology.
    +     * @param assignment the current topology assignment.
    +     * @param taskLaunchSecs timeout for right after a worker is launched.
    +     * @return the set of tasks that are alive.
    +     */
    +    public Set<List<Integer>> getAliveExecutors(String topoId, Set<List<Integer>> allExecutors, Assignment assignment, int taskLaunchSecs) {
    +        Map<List<Integer>, ExecutorCache> topoCache = cache.computeIfAbsent(topoId, MAKE_MAP);
    +        LOG.debug("Computing alive executors for {}\nExecutors: {}\nAssignment: {}\nHeartbeat cache: {}",
    +            topoId, allExecutors, assignment, topoCache);
    +
    +        Set<List<Integer>> ret = new HashSet<>();
    +        Map<List<Long>, Long> execToStartTimes = assignment.get_executor_start_time_secs();
    +
    +        for (List<Integer> exec : allExecutors) {
    +            List<Long> longExec = new ArrayList<>(exec.size());
    +            for (Integer num : exec) {
    +                longExec.add(num.longValue());
    +            }
    +
    +            Long startTime = execToStartTimes.get(longExec);
    +            ExecutorCache executorCache = topoCache.get(exec);
    +            //null isTimedOut means worker never reported any heartbeat
    +            Boolean isTimedOut = executorCache == null ? null : executorCache.isTimedOut();
    --- End diff --
    
    Yes I agree.  Will do it.  Again this was code that was already there, and I just refactored it, but I am happy to fix it.


---

[GitHub] storm pull request #2836: STORM-3162: Cleanup heartbeats cache and make it t...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2836#discussion_r217911368
  
    --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/HeartbeatCache.java ---
    @@ -0,0 +1,229 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.daemon.nimbus;
    +
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.function.Function;
    +import org.apache.storm.generated.Assignment;
    +import org.apache.storm.generated.ExecutorInfo;
    +import org.apache.storm.generated.SupervisorWorkerHeartbeat;
    +import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting;
    +import org.apache.storm.stats.ClientStatsUtil;
    +import org.apache.storm.stats.StatsUtil;
    +import org.apache.storm.utils.Time;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Holds a cache of heartbeats from the workers.
    + */
    +public class HeartbeatCache {
    +    private static final Logger LOG = LoggerFactory.getLogger(HeartbeatCache.class);
    +    private static final Function<String, ConcurrentHashMap<List<Integer>, ExecutorCache>> MAKE_MAP = (k) -> new ConcurrentHashMap<>();
    +
    +    private static class ExecutorCache {
    +        private Boolean isTimedOut;
    +        private Integer nimbusTime;
    --- End diff --
    
    Please rename to include the unit of time.


---