You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by zd-project <gi...@git.apache.org> on 2018/08/10 17:25:26 UTC

[GitHub] storm pull request #2800: STORM-3162: Fix concurrent modification bug

GitHub user zd-project opened a pull request:

    https://github.com/apache/storm/pull/2800

    STORM-3162: Fix concurrent modification bug

    See: https://issues.apache.org/jira/browse/STORM-3162
    
    I managed to alter the Java implementation and it seems to be passing storm-server test. However in storm-core there's a nimbus_test.clj which depends on some of the older implementation that I changed in this PR. I myself am not very familiar with Clojure so I don't know how to fix it. If any of you can take a look it'll be great.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/zd-project/storm STORM-3162

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/2800.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2800
    
----
commit e7bacee9ecc2c136a7ef3c5d3144eae2d95a9306
Author: Zhengdai Hu <zh...@...>
Date:   2018-08-08T21:33:19Z

    STORM-3162: Fix concurrent modification bug

----


---

[GitHub] storm pull request #2800: STORM-3162: Fix concurrent modification bug

Posted by danny0405 <gi...@git.apache.org>.
Github user danny0405 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2800#discussion_r212800892
  
    --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
    @@ -490,7 +493,7 @@ public Nimbus(Map<String, Object> conf, INimbus inimbus, IStormClusterState stor
                 stormClusterState = makeStormClusterState(conf);
             }
             this.stormClusterState = stormClusterState;
    -        this.heartbeatsCache = new AtomicReference<>(new HashMap<>());
    +        this.heartbeatsCache = new ConcurrentHashMap<>();
    --- End diff --
    
    Please change back to `AtomicReference` cause it is multi_thread visible, actually the thrift server serves the RPC methods through multi threading, so we should keep the heartbeatsCache modification be seen as much as possible.


---

[GitHub] storm issue #2800: STORM-3162: Fix concurrent modification bug

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/2800
  
    I'm sorry if this turns a bit verbose, but I'm going to write down what I see as the issue here, so we can hopefully come to a common understanding (and so I don't forget and have to look at this again)
    
    As far as I can tell, the uses of `heartbeatsCache` in Nimbus are thread safe, because the values are never modified, just overwritten. That is, we don't do `heartbeatsCache.get(topoId).put(foo, bar)`, instead we do `heartbeatsCache.getAndUpdate(func)`, which replaces the value entirely. I don't believe we need further synchronization here, since the `AtomicReference` ensures that the value changes are propagated to all threads, and two threads reading from an effectively immutable map at the same time should be fine(?)
    
    However in the `updateHeartbeatCache` method in StatsUtil https://github.com/apache/storm/blob/4c42ee3d259d5d90a4e7d3445d1c119601eec6c7/storm-client/src/jvm/org/apache/storm/stats/StatsUtil.java#L1565 we take one of the values from `heartbeatsCache` and modify it.
    
    There are a couple of problems here. First, the `cache` value is a regular `HashMap` and not a `ConcurrentHashMap`, so modifying it from two threads at once isn't safe. Second, in the branch in `updateHeartbeatCache` where `executorBeats` is null, we iterate over the `cache` parameter. If one thread is in the iteration, and another thread is in the other branch in `updateHeartbeatCache`, we get the exception.
    
    The reason this exception isn't thrown in a real cluster is that the `executorBeats` parameter is only null when called from https://github.com/apache/storm/blob/4c42ee3d259d5d90a4e7d3445d1c119601eec6c7/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java#L1681
    
    This only happens when Nimbus is booting up as part of `launchServer`, or when someone triggers a rebalance in the topology. We see it in the tests, because Nimbus and the supervisors are started concurrently, so Nimbus can be in one branch in `StatsUtil.updateHeartbeatCache` while one of the supervisors is in the other branch. It can technically happen in a real cluster, but someone would have to get extremely unlucky with rebalance timing.
    
    I think the fix here should be making sure that `StatsUtil.updateHeartbeatCache` is thread safe. One option is to make the `cache` value a `ConcurrentHashMap`. Another option would be to make `updateHeartbeatCache` create and return a new map, instead of modifying the existing one.


---

[GitHub] storm pull request #2800: STORM-3162: Fix concurrent modification bug

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2800#discussion_r210639246
  
    --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
    @@ -1660,26 +1658,18 @@ private TopologyDetails readTopologyDetails(String topoId, StormBase base) throw
     
         private void updateHeartbeatsFromZkHeartbeat(String topoId, Set<List<Integer>> allExecutors, Assignment existingAssignment) {
             LOG.debug("Updating heartbeats for {} {} (from ZK heartbeat)", topoId, allExecutors);
    -        IStormClusterState state = stormClusterState;
             Map<List<Integer>, Map<String, Object>> executorBeats =
    -            StatsUtil.convertExecutorBeats(state.executorBeats(topoId, existingAssignment.get_executor_node_port()));
    -        Map<List<Integer>, Map<String, Object>> cache = StatsUtil.updateHeartbeatCacheFromZkHeartbeat(heartbeatsCache.get().get(topoId),
    -                                                                                                      executorBeats, allExecutors,
    -                                                                                                      ObjectReader.getInt(conf.get(
    -                                                                                                          DaemonConfig
    -                                                                                                              .NIMBUS_TASK_TIMEOUT_SECS)));
    -        heartbeatsCache.getAndUpdate(new Assoc<>(topoId, cache));
    +            StatsUtil.convertExecutorBeats(stormClusterState.executorBeats(topoId, existingAssignment.get_executor_node_port()));
    +        heartbeatsCache.compute(topoId, (k, v) ->
    +                //Guaranteed side-effect-free
    --- End diff --
    
    We should probably put this requirement in comments on the two methods in StatsUtil rather than here, so they stay side effect free.


---

[GitHub] storm issue #2800: STORM-3162: Fix concurrent modification bug

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/2800
  
    Fixed the nimbus_test https://github.com/zd-project/storm/pull/1


---

[GitHub] storm pull request #2800: STORM-3162: Fix concurrent modification bug

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2800#discussion_r210633216
  
    --- Diff: storm-client/src/jvm/org/apache/storm/stats/StatsUtil.java ---
    @@ -1565,23 +1565,26 @@ public static ComponentPageInfo aggCompExecsStats(
         public static void updateHeartbeatCache(Map<List<Integer>, Map<String, Object>> cache,
                                                 Map<List<Integer>, Map<String, Object>> executorBeats, Set<List<Integer>> executors,
                                                 Integer timeout) {
    -        //if not executor beats, refresh is-timed-out of the cache which is done by master
    +        assert cache instanceof ConcurrentMap;
    +        //Should we enforce update-if-newer policy?
             if (executorBeats == null) {
    -            for (Map.Entry<List<Integer>, Map<String, Object>> executorbeat : cache.entrySet()) {
    -                Map<String, Object> beat = executorbeat.getValue();
    +            //If not executor beats, refresh is-timed-out of the cache which is done by master
    +            for (Map.Entry<List<Integer>, Map<String, Object>> executorBeat : cache.entrySet()) {
    +                Map<String, Object> beat = executorBeat.getValue();
                     beat.put("is-timed-out", Time.deltaSecs((Integer) beat.get("nimbus-time")) >= timeout);
                 }
    -            return;
    -        }
    -        //else refresh nimbus-time and executor-reported-time by heartbeats reporting
    -        for (List<Integer> executor : executors) {
    -            cache.put(executor, updateExecutorCache(cache.get(executor), executorBeats.get(executor), timeout));
    +        } else {
    --- End diff --
    
    Nit: Explicit return can help decrease indentation, I liked it better before.


---

[GitHub] storm issue #2800: STORM-3162: Fix concurrent modification bug

Posted by danny0405 <gi...@git.apache.org>.
Github user danny0405 commented on the issue:

    https://github.com/apache/storm/pull/2800
  
    @srdo 
    Okey, i checkout the AtomicReference for jdk8 and get the code snippet:
    ```java
    public final V getAndUpdate(UnaryOperator<V> updateFunction) {
            V prev, next;
            do {
                prev = get();
                next = updateFunction.apply(prev);
            } while (!compareAndSet(prev, next));
            return prev;
        }
    ```
    For our storm use cases, we never update the
    ```java
    AtomicReference<Map<String, Map<List<Integer>, Map<String, Object>>>> heartbeatsCache
    ```
    reference, so we use it totally as a normal HashMap, and there lost the thread safety.
    
    I think `CincurrentHashMap` is better.


---

[GitHub] storm issue #2800: STORM-3162: Fix concurrent modification bug

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/2800
  
    @danny0405 Could you elaborate on why fixing the `executorBeats == null` branch is enough? My concern is that the other branch modifies a HashMap (the `cache` parameter) from multiple threads with no synchronization. Why is this safe?


---

[GitHub] storm issue #2800: STORM-3162: Fix concurrent modification bug

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the issue:

    https://github.com/apache/storm/pull/2800
  
    Please take a look at #2836 as an alternative.  It is a much bigger patch, but I think the refactoring it does will make things much easier longer term.  Having done the other patch I think this one does fix the immediate issue, but the current HB cache is so complex that I didn't feel like I understood all of the ways the cache was accessed before doing the other patch.


---

[GitHub] storm pull request #2800: STORM-3162: Fix concurrent modification bug

Posted by zd-project <gi...@git.apache.org>.
Github user zd-project commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2800#discussion_r212801069
  
    --- Diff: storm-client/src/jvm/org/apache/storm/stats/StatsUtil.java ---
    @@ -1525,27 +1528,24 @@ public static ComponentPageInfo aggCompExecsStats(
          * @param timeout       timeout
          * @return a HashMap of updated executor heart beats
          */
    -    public static Map<List<Integer>, Map<String, Object>> updateHeartbeatCacheFromZkHeartbeat(Map<List<Integer>, Map<String, Object>> cache,
    -                                                                                              Map<List<Integer>, Map<String, Object>>
    -                                                                                                  executorBeats,
    -                                                                                              Set<List<Integer>> executors,
    -                                                                                              Integer timeout) {
    -        Map<List<Integer>, Map<String, Object>> ret = new HashMap<>();
    -        if (cache == null && executorBeats == null) {
    -            return ret;
    -        }
    -
    +    public static ConcurrentMap<List<Integer>, Map<String, Object>> updateHeartbeatCacheFromZkHeartbeat(Map<List<Integer>, Map<String, Object>> cache,
    +                                                                                                        Map<List<Integer>, Map<String, Object>>
    +                                                                                                                executorBeats,
    +                                                                                                        Set<List<Integer>> executors,
    +                                                                                                        Integer timeout) {
             if (cache == null) {
    +            if (executorBeats == null) {
    --- End diff --
    
    The very same map created here will be used in `updateHeartbeatCache`, which may be modified concurrently there. Hope this answered your question.


---

[GitHub] storm pull request #2800: STORM-3162: Fix concurrent modification bug

Posted by danny0405 <gi...@git.apache.org>.
Github user danny0405 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2800#discussion_r212801184
  
    --- Diff: storm-client/src/jvm/org/apache/storm/stats/StatsUtil.java ---
    @@ -1525,27 +1528,24 @@ public static ComponentPageInfo aggCompExecsStats(
          * @param timeout       timeout
          * @return a HashMap of updated executor heart beats
          */
    -    public static Map<List<Integer>, Map<String, Object>> updateHeartbeatCacheFromZkHeartbeat(Map<List<Integer>, Map<String, Object>> cache,
    -                                                                                              Map<List<Integer>, Map<String, Object>>
    -                                                                                                  executorBeats,
    -                                                                                              Set<List<Integer>> executors,
    -                                                                                              Integer timeout) {
    -        Map<List<Integer>, Map<String, Object>> ret = new HashMap<>();
    -        if (cache == null && executorBeats == null) {
    -            return ret;
    -        }
    -
    +    public static ConcurrentMap<List<Integer>, Map<String, Object>> updateHeartbeatCacheFromZkHeartbeat(Map<List<Integer>, Map<String, Object>> cache,
    +                                                                                                        Map<List<Integer>, Map<String, Object>>
    +                                                                                                                executorBeats,
    +                                                                                                        Set<List<Integer>> executors,
    +                                                                                                        Integer timeout) {
             if (cache == null) {
    +            if (executorBeats == null) {
    --- End diff --
    
     Concurrently modify a HashMap is ok if we are not also iterate over it, for heartbeats updating, we only need a final consistency.


---

[GitHub] storm pull request #2800: STORM-3162: Fix concurrent modification bug

Posted by zd-project <gi...@git.apache.org>.
Github user zd-project commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2800#discussion_r212801285
  
    --- Diff: storm-client/src/jvm/org/apache/storm/stats/StatsUtil.java ---
    @@ -1565,23 +1565,26 @@ public static ComponentPageInfo aggCompExecsStats(
         public static void updateHeartbeatCache(Map<List<Integer>, Map<String, Object>> cache,
                                                 Map<List<Integer>, Map<String, Object>> executorBeats, Set<List<Integer>> executors,
                                                 Integer timeout) {
    -        //if not executor beats, refresh is-timed-out of the cache which is done by master
    +        assert cache instanceof ConcurrentMap;
    +        //Should we enforce update-if-newer policy?
             if (executorBeats == null) {
    -            for (Map.Entry<List<Integer>, Map<String, Object>> executorbeat : cache.entrySet()) {
    -                Map<String, Object> beat = executorbeat.getValue();
    +            //If not executor beats, refresh is-timed-out of the cache which is done by master
    --- End diff --
    
    This is actually where the `ConcurrentModificationException` is thrown. Notice that the old code invokes both `cache.entrySet()` and `cache.put()` in this method. Since it's exposed through thrift, it's possible to have `ConcurrentModificationException`. Also see travis log here for an example: https://travis-ci.org/apache/storm/jobs/408719153#L1897


---

[GitHub] storm issue #2800: STORM-3162: Fix concurrent modification bug

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/2800
  
    @danny0405 That doesn't sound safe to me. I think you're right that it works fine most of the time, but if there are key collisions or an insert leads the map to get resized, I would think that two threads modifying the map at the same time could interfere with each other.
    
    Either way, if you're okay with making the whole function thread safe, I think we should do it.


---

[GitHub] storm issue #2800: STORM-3162: Fix concurrent modification bug

Posted by danny0405 <gi...@git.apache.org>.
Github user danny0405 commented on the issue:

    https://github.com/apache/storm/pull/2800
  
    Please make sure again why the `ConcurrentModificationException` happens and attach the stack trace.


---

[GitHub] storm issue #2800: STORM-3162: Fix concurrent modification bug

Posted by danny0405 <gi...@git.apache.org>.
Github user danny0405 commented on the issue:

    https://github.com/apache/storm/pull/2800
  
    @srdo @zd-project 
    Thx for your explanation, that make sense for me.
    
    One thing needs to clarify is that `executorBeats` parameter for `StatsUtil#updateHeartbeatCache` is null for every scheduling round of master in order to refresh the `is-timed-out` flag.
    
    There does have possibility that supervisor/worker will walk into code branch:
    https://github.com/apache/storm/blob/4c42ee3d259d5d90a4e7d3445d1c119601eec6c7/storm-client/src/jvm/org/apache/storm/stats/StatsUtil.java#L1576-L1579
    
    and Nimbus the other:
    https://github.com/apache/storm/blob/4c42ee3d259d5d90a4e7d3445d1c119601eec6c7/storm-client/src/jvm/org/apache/storm/stats/StatsUtil.java#L1568-L1574
    
    I think the key here is we used a `forEach` interation for the cache, so here, we could change it to a iterator loop, which is okey cause we only need final consistency instead of ConcurrentMap or copy which will cause perf regression.


---

[GitHub] storm pull request #2800: STORM-3162: Fix concurrent modification bug

Posted by danny0405 <gi...@git.apache.org>.
Github user danny0405 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2800#discussion_r212800820
  
    --- Diff: storm-client/src/jvm/org/apache/storm/stats/StatsUtil.java ---
    @@ -1565,23 +1565,26 @@ public static ComponentPageInfo aggCompExecsStats(
         public static void updateHeartbeatCache(Map<List<Integer>, Map<String, Object>> cache,
                                                 Map<List<Integer>, Map<String, Object>> executorBeats, Set<List<Integer>> executors,
                                                 Integer timeout) {
    -        //if not executor beats, refresh is-timed-out of the cache which is done by master
    +        assert cache instanceof ConcurrentMap;
    +        //Should we enforce update-if-newer policy?
             if (executorBeats == null) {
    -            for (Map.Entry<List<Integer>, Map<String, Object>> executorbeat : cache.entrySet()) {
    -                Map<String, Object> beat = executorbeat.getValue();
    +            //If not executor beats, refresh is-timed-out of the cache which is done by master
    --- End diff --
    
    This code branch can only be invoked by 'Nimbus' and it is always a single thread modification, so please make sure if it will throw any `ConcurrentModificationException`.


---

[GitHub] storm pull request #2800: STORM-3162: Fix concurrent modification bug

Posted by danny0405 <gi...@git.apache.org>.
Github user danny0405 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2800#discussion_r212801634
  
    --- Diff: storm-client/src/jvm/org/apache/storm/stats/StatsUtil.java ---
    @@ -1565,23 +1565,26 @@ public static ComponentPageInfo aggCompExecsStats(
         public static void updateHeartbeatCache(Map<List<Integer>, Map<String, Object>> cache,
                                                 Map<List<Integer>, Map<String, Object>> executorBeats, Set<List<Integer>> executors,
                                                 Integer timeout) {
    -        //if not executor beats, refresh is-timed-out of the cache which is done by master
    +        assert cache instanceof ConcurrentMap;
    +        //Should we enforce update-if-newer policy?
             if (executorBeats == null) {
    -            for (Map.Entry<List<Integer>, Map<String, Object>> executorbeat : cache.entrySet()) {
    -                Map<String, Object> beat = executorbeat.getValue();
    +            //If not executor beats, refresh is-timed-out of the cache which is done by master
    --- End diff --
    
    Then please check the code invocation when the passed in `executorBeats == null`, for `sendSupervisorWorkerHeartbeat` we will never get a null but at least a empty map.
    
    For testing, i believe there should be some bug to fix, but this code modification is not that necessary.


---

[GitHub] storm issue #2800: STORM-3162: Fix concurrent modification bug

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/2800
  
    @danny0405 Thanks for explaining. I'm not sure I understand why the scheduling thread will see older values with `ConcurrentHashMap` than with `AtomicReference`? It was my understanding that `ConcurrentHashMap` had the same happens-before guarantees as volatile variables for reads/writes?


---

[GitHub] storm issue #2800: STORM-3162: Fix concurrent modification bug

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/2800
  
    @zd-project I'd like to finish this up. Let me know if you want to make the last couple of fixes, otherwise I'll open a new PR containing this fix.
    
    @danny0405 I thought about it a bit more, and while I still think we can fix this by making `updateHeartbeatCache` thread safe by making it return a new map and keeping the pre-this-PR AtomicReference in Nimbus, I'm not sure why this would be faster than just using a ConcurrentHashMap like the current PR code here does? Using the AtomicReference in Nimbus essentially makes the heartbeat cache a copy-on-write Map due to the way we do updates via Assoc and Dissoc. I would expect a ConcurrentHashMap to provide better parallelism. What do you think?


---

[GitHub] storm pull request #2800: STORM-3162: Fix concurrent modification bug

Posted by zd-project <gi...@git.apache.org>.
Github user zd-project commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2800#discussion_r212801204
  
    --- Diff: storm-client/src/jvm/org/apache/storm/stats/StatsUtil.java ---
    @@ -1565,23 +1565,26 @@ public static ComponentPageInfo aggCompExecsStats(
         public static void updateHeartbeatCache(Map<List<Integer>, Map<String, Object>> cache,
                                                 Map<List<Integer>, Map<String, Object>> executorBeats, Set<List<Integer>> executors,
                                                 Integer timeout) {
    -        //if not executor beats, refresh is-timed-out of the cache which is done by master
    +        assert cache instanceof ConcurrentMap;
    +        //Should we enforce update-if-newer policy?
             if (executorBeats == null) {
    -            for (Map.Entry<List<Integer>, Map<String, Object>> executorbeat : cache.entrySet()) {
    -                Map<String, Object> beat = executorbeat.getValue();
    +            //If not executor beats, refresh is-timed-out of the cache which is done by master
    --- End diff --
    
    I believe this is wrapped in another method exposed in Thrift API, see `sendSupervisorWorkerHeartbeat`


---

[GitHub] storm issue #2800: STORM-3162: Fix concurrent modification bug

Posted by danny0405 <gi...@git.apache.org>.
Github user danny0405 commented on the issue:

    https://github.com/apache/storm/pull/2800
  
    @srdo 
    Cause only modify it through multi-threads but not iterator over it, and the cache key is executor-id, which will only have conflict between master and supervisor.


---

[GitHub] storm issue #2800: STORM-3162: Fix concurrent modification bug

Posted by danny0405 <gi...@git.apache.org>.
Github user danny0405 commented on the issue:

    https://github.com/apache/storm/pull/2800
  
    @srdo 
     I think the only difference is that compared to `AtomicReference`, `ConcurrentHashMap` can keep thread safe but can not ensure that the value read is up to date,  which will cause some inconsistent behavior,cause the scheduling thread will read it every 10s, with an out of date heartbeat cache, master will kill a fine worker or restart a already started one.


---

[GitHub] storm issue #2800: STORM-3162: Fix concurrent modification bug

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/2800
  
    I don't think fixing the `executorBeats == null` branch is enough. As far as I can tell, two supervisors/workers can be in the https://github.com/apache/storm/blob/4c42ee3d259d5d90a4e7d3445d1c119601eec6c7/storm-client/src/jvm/org/apache/storm/stats/StatsUtil.java#L1576-L1579 branch at the same time for the same topology. We won't get an exception if this happens, but we'll still be modifying a HashMap from two threads at the same time, which isn't safe.
    
    Regarding fixing the `executorBeats == null` branch, it isn't enough to switch to an iterator, since iterators have the same behavior as a `forEach` loop (throws exception if underlying collection is concurrently modified). 


---

[GitHub] storm issue #2800: STORM-3162: Fix concurrent modification bug

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on the issue:

    https://github.com/apache/storm/pull/2800
  
    Regarding performance, consider that Nimbus is already copying `heartbeatCache` on writes everywhere else https://github.com/apache/storm/blob/master/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java#L4636-L4639.
    
    Changing `StatsUtil.updateHeartbeatCache` to return a new Map and using `Assoc` to update `heartbeatCache` would be my preferred solution.


---

[GitHub] storm pull request #2800: STORM-3162: Fix concurrent modification bug

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2800#discussion_r210632176
  
    --- Diff: storm-client/src/jvm/org/apache/storm/stats/StatsUtil.java ---
    @@ -1525,27 +1528,24 @@ public static ComponentPageInfo aggCompExecsStats(
          * @param timeout       timeout
          * @return a HashMap of updated executor heart beats
          */
    -    public static Map<List<Integer>, Map<String, Object>> updateHeartbeatCacheFromZkHeartbeat(Map<List<Integer>, Map<String, Object>> cache,
    -                                                                                              Map<List<Integer>, Map<String, Object>>
    -                                                                                                  executorBeats,
    -                                                                                              Set<List<Integer>> executors,
    -                                                                                              Integer timeout) {
    -        Map<List<Integer>, Map<String, Object>> ret = new HashMap<>();
    -        if (cache == null && executorBeats == null) {
    -            return ret;
    -        }
    -
    +    public static ConcurrentMap<List<Integer>, Map<String, Object>> updateHeartbeatCacheFromZkHeartbeat(Map<List<Integer>, Map<String, Object>> cache,
    +                                                                                                        Map<List<Integer>, Map<String, Object>>
    +                                                                                                                executorBeats,
    +                                                                                                        Set<List<Integer>> executors,
    +                                                                                                        Integer timeout) {
             if (cache == null) {
    +            if (executorBeats == null) {
    --- End diff --
    
    The construction here seems a little odd. Initializing cache and executorBeats when null isn't necessary. I think we should keep the "if cache and executorBeats are null" clause, then replace the other two branches with something like `Map<String, Object> currBeat = cache == null ? null : cache.get(executor);` and equivalent for `newBeat`.


---

[GitHub] storm issue #2800: STORM-3162: Fix concurrent modification bug

Posted by danny0405 <gi...@git.apache.org>.
Github user danny0405 commented on the issue:

    https://github.com/apache/storm/pull/2800
  
    @srdo 
    Agree that copy is a better solution.


---

[GitHub] storm pull request #2800: STORM-3162: Fix concurrent modification bug

Posted by danny0405 <gi...@git.apache.org>.
Github user danny0405 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2800#discussion_r212800719
  
    --- Diff: storm-client/src/jvm/org/apache/storm/stats/StatsUtil.java ---
    @@ -1525,27 +1528,24 @@ public static ComponentPageInfo aggCompExecsStats(
          * @param timeout       timeout
          * @return a HashMap of updated executor heart beats
          */
    -    public static Map<List<Integer>, Map<String, Object>> updateHeartbeatCacheFromZkHeartbeat(Map<List<Integer>, Map<String, Object>> cache,
    -                                                                                              Map<List<Integer>, Map<String, Object>>
    -                                                                                                  executorBeats,
    -                                                                                              Set<List<Integer>> executors,
    -                                                                                              Integer timeout) {
    -        Map<List<Integer>, Map<String, Object>> ret = new HashMap<>();
    -        if (cache == null && executorBeats == null) {
    -            return ret;
    -        }
    -
    +    public static ConcurrentMap<List<Integer>, Map<String, Object>> updateHeartbeatCacheFromZkHeartbeat(Map<List<Integer>, Map<String, Object>> cache,
    +                                                                                                        Map<List<Integer>, Map<String, Object>>
    +                                                                                                                executorBeats,
    +                                                                                                        Set<List<Integer>> executors,
    +                                                                                                        Integer timeout) {
             if (cache == null) {
    +            if (executorBeats == null) {
    --- End diff --
    
    Sorry, i did see any reason why this method is not thread safe, cause it almost a tool method, only to initialize a Map cache which is updated into `Nimbus` heartbeatsCache through `heartbeatsCache.getAndUpdate(new Assoc<>(topoId, cache))`,` ConcurrentModificationException` happens when we iterate over a collection through iterator and also modify it, but here, we only iterate the executor list and do not modify any of the list entry.


---

[GitHub] storm issue #2800: STORM-3162: Fix concurrent modification bug

Posted by zd-project <gi...@git.apache.org>.
Github user zd-project commented on the issue:

    https://github.com/apache/storm/pull/2800
  
    As far as my understanding of the code goes I believe this fix should resolve this particular issue. However my understanding in HB cache was limited and the PR was pushed hastily during my last few days of internship. I definitely support switching to the alternative if it actually solves the deeper structural issue.
    
    Please let me know if there's anything else I can help with.


---

[GitHub] storm issue #2800: STORM-3162: Fix concurrent modification bug

Posted by zd-project <gi...@git.apache.org>.
Github user zd-project commented on the issue:

    https://github.com/apache/storm/pull/2800
  
    Agreed. I think atomic reference is really just for Clojure compatibility there. I’ll finish this up. 


---