You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by jmartell7 <gi...@git.apache.org> on 2017/11/01 17:21:01 UTC

[GitHub] storm pull request #2399: Track network data metrics STORM-2793

GitHub user jmartell7 opened a pull request:

    https://github.com/apache/storm/pull/2399

    Track network data metrics STORM-2793

    Add metrics to count data transfer to account for tuples of differing sizes.  @revans2, please review when you get a chance. 
    
    https://issues.apache.org/jira/browse/STORM-2793

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/jmartell7/storm tuple_size_metrics_STORM-2793

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/2399.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2399
    
----
commit f46209e72cb7ad5b657dafa153b332d3680e785c
Author: Joshua Martell <jm...@yahoo-inc.com>
Date:   2017-11-01T17:17:24Z

    Track network data metrics STORM-2793

----


---

[GitHub] storm issue #2399: Track network data metrics STORM-2793

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/2399
  
    Looks like the patch addresses a bit different point rather than issue description in STORM-2793. 
    
    Below is the issue description in STORM-2793:
    
    > Existing Storm metrics track tuple counts, but don't account for the size of the tuple payload.  If a task always gets large tuples, it can be slower than a task that gets small tuples without showing any reason why.  It would be good to track the byte counts as well so data skew can be observed directly.
    
    from the description I imagined that the issue would want to track the tuples in bytes for every tasks (maybe with sampling), but this patch only addresses tracking `transferred` tuples, which ignores local tasks.
    
    I can still see the benefit with measuring transferred tuples, but just like to check the patch fits origin intention for the issue.
    
    In addition, currently we allow to put map to value in metric and let metrics collector handles it in any way. It should be changed, but we could resolve it along with other metrics later.
    
    Overall looks good to me, and it would be much better if we could see the performance impact and confirm it's small enough.


---

[GitHub] storm issue #2399: STORM-2793 Track network data metrics

Posted by jmartell7 <gi...@git.apache.org>.
Github user jmartell7 commented on the issue:

    https://github.com/apache/storm/pull/2399
  
    Thank you, everyone for the reviews.  @HeartSaVioR , I've squashed the commits.


---

[GitHub] storm pull request #2399: Track network data metrics STORM-2793

Posted by jmartell7 <gi...@git.apache.org>.
Github user jmartell7 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2399#discussion_r148328791
  
    --- Diff: storm-client/src/jvm/org/apache/storm/messaging/DeserializingConnectionCallback.java ---
    @@ -17,44 +17,102 @@
      */
     package org.apache.storm.messaging;
     
    +import org.apache.storm.Config;
     import org.apache.storm.daemon.worker.WorkerState;
    +import org.apache.storm.metric.api.IMetric;
    +import org.apache.storm.serialization.KryoTupleDeserializer;
     import org.apache.storm.task.GeneralTopologyContext;
     import org.apache.storm.tuple.AddressedTuple;
    -import org.apache.storm.serialization.KryoTupleDeserializer;
    +import org.apache.storm.tuple.Tuple;
     
     import java.util.ArrayList;
    +import java.util.HashMap;
     import java.util.List;
     import java.util.Map;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.atomic.AtomicLong;
    +
     
     /**
      * A class that is called when a TaskMessage arrives.
      */
    -public class DeserializingConnectionCallback implements IConnectionCallback {
    +public class DeserializingConnectionCallback implements IConnectionCallback, IMetric {
         private final WorkerState.ILocalTransferCallback _cb;
         private final Map _conf;
         private final GeneralTopologyContext _context;
    +
         private final ThreadLocal<KryoTupleDeserializer> _des =
    -         new ThreadLocal<KryoTupleDeserializer>() {
    -             @Override
    -             protected KryoTupleDeserializer initialValue() {
    -                 return new KryoTupleDeserializer(_conf, _context);
    -             }
    -         };
    -
    -    public DeserializingConnectionCallback(final Map<String, Object> conf, final GeneralTopologyContext context, WorkerState.ILocalTransferCallback callback) {
    +        new ThreadLocal<KryoTupleDeserializer>() {
    +            @Override
    +            protected KryoTupleDeserializer initialValue() {
    +                return new KryoTupleDeserializer(_conf, _context);
    +            }
    +        };
    +
    +    // Track serialized size of messages
    +    private final boolean _sizeMetricsEnabled;
    +    private final ConcurrentHashMap<String, AtomicLong> _byteCounts = new ConcurrentHashMap<>();
    +
    +
    +    public DeserializingConnectionCallback(final Map conf, final GeneralTopologyContext context, WorkerState.ILocalTransferCallback callback) {
             _conf = conf;
             _context = context;
             _cb = callback;
    +        _sizeMetricsEnabled = _conf.get(Config.TOPOLOGY_SERIALIZED_MESSAGE_SIZE_METRICS) instanceof Boolean &&
    +                              (Boolean) _conf.get(Config.TOPOLOGY_SERIALIZED_MESSAGE_SIZE_METRICS);
    +
         }
     
         @Override
         public void recv(List<TaskMessage> batch) {
             KryoTupleDeserializer des = _des.get();
             ArrayList<AddressedTuple> ret = new ArrayList<>(batch.size());
             for (TaskMessage message: batch) {
    -            ret.add(new AddressedTuple(message.task(), des.deserialize(message.message())));
    +            Tuple tuple = des.deserialize(message.message());
    +            AddressedTuple addrTuple = new AddressedTuple(message.task(), tuple);
    +            updateMetrics(tuple.getSourceTask(), message);
    +            ret.add(addrTuple);
             }
             _cb.transfer(ret);
         }
     
    +    @Override
    +    public Object getValueAndReset() {
    +        HashMap<String, Long> outMap = new HashMap<>();
    +
    +        if (_sizeMetricsEnabled) { // Possible race conditions
    +            for (Map.Entry<String, AtomicLong> ent : _byteCounts.entrySet()) {
    +                AtomicLong count = ent.getValue();
    +                if (count.get() > 0) {
    +                    outMap.put(ent.getKey(), count.getAndSet(0L));
    +                }
    +            }
    +        }
    +        return outMap;
    +    }
    +
    +    /**
    +     * Update serialized byte counts for each message
    +     * @param sourceTaskId source task
    +     * @param message serialized message
    +     */
    +    protected void updateMetrics(int sourceTaskId, TaskMessage message) {
    +        if (_sizeMetricsEnabled) { // Possible race conditions
    +            int dest = message.task();
    +            int len = message.message().length;
    +            String key = Integer.toString(sourceTaskId) + "-" + Integer.toString(dest);
    --- End diff --
    
    Should this use the ComponentId instead?


---

[GitHub] storm pull request #2399: Track network data metrics STORM-2793

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2399#discussion_r148328916
  
    --- Diff: storm-client/src/jvm/org/apache/storm/messaging/DeserializingConnectionCallback.java ---
    @@ -17,44 +17,102 @@
      */
     package org.apache.storm.messaging;
     
    +import org.apache.storm.Config;
     import org.apache.storm.daemon.worker.WorkerState;
    +import org.apache.storm.metric.api.IMetric;
    +import org.apache.storm.serialization.KryoTupleDeserializer;
     import org.apache.storm.task.GeneralTopologyContext;
     import org.apache.storm.tuple.AddressedTuple;
    -import org.apache.storm.serialization.KryoTupleDeserializer;
    +import org.apache.storm.tuple.Tuple;
     
     import java.util.ArrayList;
    +import java.util.HashMap;
     import java.util.List;
     import java.util.Map;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.atomic.AtomicLong;
    +
     
     /**
      * A class that is called when a TaskMessage arrives.
      */
    -public class DeserializingConnectionCallback implements IConnectionCallback {
    +public class DeserializingConnectionCallback implements IConnectionCallback, IMetric {
         private final WorkerState.ILocalTransferCallback _cb;
         private final Map _conf;
         private final GeneralTopologyContext _context;
    +
         private final ThreadLocal<KryoTupleDeserializer> _des =
    -         new ThreadLocal<KryoTupleDeserializer>() {
    -             @Override
    -             protected KryoTupleDeserializer initialValue() {
    -                 return new KryoTupleDeserializer(_conf, _context);
    -             }
    -         };
    -
    -    public DeserializingConnectionCallback(final Map<String, Object> conf, final GeneralTopologyContext context, WorkerState.ILocalTransferCallback callback) {
    +        new ThreadLocal<KryoTupleDeserializer>() {
    +            @Override
    +            protected KryoTupleDeserializer initialValue() {
    +                return new KryoTupleDeserializer(_conf, _context);
    +            }
    +        };
    +
    +    // Track serialized size of messages
    +    private final boolean _sizeMetricsEnabled;
    +    private final ConcurrentHashMap<String, AtomicLong> _byteCounts = new ConcurrentHashMap<>();
    +
    +
    +    public DeserializingConnectionCallback(final Map conf, final GeneralTopologyContext context, WorkerState.ILocalTransferCallback callback) {
             _conf = conf;
             _context = context;
             _cb = callback;
    +        _sizeMetricsEnabled = _conf.get(Config.TOPOLOGY_SERIALIZED_MESSAGE_SIZE_METRICS) instanceof Boolean &&
    +                              (Boolean) _conf.get(Config.TOPOLOGY_SERIALIZED_MESSAGE_SIZE_METRICS);
    +
         }
     
         @Override
         public void recv(List<TaskMessage> batch) {
             KryoTupleDeserializer des = _des.get();
             ArrayList<AddressedTuple> ret = new ArrayList<>(batch.size());
             for (TaskMessage message: batch) {
    -            ret.add(new AddressedTuple(message.task(), des.deserialize(message.message())));
    +            Tuple tuple = des.deserialize(message.message());
    +            AddressedTuple addrTuple = new AddressedTuple(message.task(), tuple);
    +            updateMetrics(tuple.getSourceTask(), message);
    +            ret.add(addrTuple);
             }
             _cb.transfer(ret);
         }
     
    +    @Override
    +    public Object getValueAndReset() {
    +        HashMap<String, Long> outMap = new HashMap<>();
    +
    +        if (_sizeMetricsEnabled) { // Possible race conditions
    +            for (Map.Entry<String, AtomicLong> ent : _byteCounts.entrySet()) {
    +                AtomicLong count = ent.getValue();
    +                if (count.get() > 0) {
    +                    outMap.put(ent.getKey(), count.getAndSet(0L));
    +                }
    +            }
    +        }
    +        return outMap;
    +    }
    +
    +    /**
    +     * Update serialized byte counts for each message
    +     * @param sourceTaskId source task
    +     * @param message serialized message
    +     */
    +    protected void updateMetrics(int sourceTaskId, TaskMessage message) {
    +        if (_sizeMetricsEnabled) { // Possible race conditions
    --- End diff --
    
    Here too, the comment is a bit confusing.


---

[GitHub] storm pull request #2399: Track network data metrics STORM-2793

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2399#discussion_r148330545
  
    --- Diff: storm-client/src/jvm/org/apache/storm/messaging/netty/Server.java ---
    @@ -249,6 +250,15 @@ public Object getState() {
                 }
             }
             ret.put("enqueued", enqueued);
    +        
    +        // Report messageSizes metric
    +        if (_cb instanceof IMetric) {
    +            Object metrics = ((IMetric) _cb).getValueAndReset();
    +            if(metrics instanceof Map && !((Map) metrics).isEmpty()) {
    --- End diff --
    
    Also fitting with what we had previously.  It would be nice to allow an empty map, but not do anything on null.  This would match what we do in other places working with IMetrics.


---

[GitHub] storm pull request #2399: STORM-2793 Track network data metrics

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/storm/pull/2399


---

[GitHub] storm pull request #2399: Track network data metrics STORM-2793

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2399#discussion_r148327252
  
    --- Diff: storm-client/src/jvm/org/apache/storm/Config.java ---
    @@ -451,6 +451,12 @@
         public static final String TOPOLOGY_METRICS_CONSUMER_REGISTER = "topology.metrics.consumer.register";
     
         /**
    +     * Enable tracking of network message byte counts per source-destination task
    --- End diff --
    
    It would be good to add this in to defaults.yaml.  Just so it is clear what the default is.  It would also be nice to explain why it is disabled by default.  At least from the code that is what it looks like is the intention.


---

[GitHub] storm pull request #2399: Track network data metrics STORM-2793

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2399#discussion_r148327705
  
    --- Diff: storm-client/src/jvm/org/apache/storm/messaging/DeserializingConnectionCallback.java ---
    @@ -17,44 +17,102 @@
      */
     package org.apache.storm.messaging;
     
    +import org.apache.storm.Config;
     import org.apache.storm.daemon.worker.WorkerState;
    +import org.apache.storm.metric.api.IMetric;
    +import org.apache.storm.serialization.KryoTupleDeserializer;
     import org.apache.storm.task.GeneralTopologyContext;
     import org.apache.storm.tuple.AddressedTuple;
    -import org.apache.storm.serialization.KryoTupleDeserializer;
    +import org.apache.storm.tuple.Tuple;
     
     import java.util.ArrayList;
    +import java.util.HashMap;
     import java.util.List;
     import java.util.Map;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.atomic.AtomicLong;
    +
     
     /**
      * A class that is called when a TaskMessage arrives.
      */
    -public class DeserializingConnectionCallback implements IConnectionCallback {
    +public class DeserializingConnectionCallback implements IConnectionCallback, IMetric {
         private final WorkerState.ILocalTransferCallback _cb;
         private final Map _conf;
         private final GeneralTopologyContext _context;
    +
         private final ThreadLocal<KryoTupleDeserializer> _des =
    -         new ThreadLocal<KryoTupleDeserializer>() {
    -             @Override
    -             protected KryoTupleDeserializer initialValue() {
    -                 return new KryoTupleDeserializer(_conf, _context);
    -             }
    -         };
    -
    -    public DeserializingConnectionCallback(final Map<String, Object> conf, final GeneralTopologyContext context, WorkerState.ILocalTransferCallback callback) {
    +        new ThreadLocal<KryoTupleDeserializer>() {
    +            @Override
    +            protected KryoTupleDeserializer initialValue() {
    +                return new KryoTupleDeserializer(_conf, _context);
    +            }
    +        };
    +
    +    // Track serialized size of messages
    +    private final boolean _sizeMetricsEnabled;
    +    private final ConcurrentHashMap<String, AtomicLong> _byteCounts = new ConcurrentHashMap<>();
    --- End diff --
    
    I know this fits the convention we have been using for member variables in this file, but it would be good to follow the new style guides and remove the '_'.  If you want to update the other member variables in the file too that would be great.


---

[GitHub] storm issue #2399: STORM-2793 Track network data metrics

Posted by jmartell7 <gi...@git.apache.org>.
Github user jmartell7 commented on the issue:

    https://github.com/apache/storm/pull/2399
  
    Performance didn't seem to be affected by adding the metric. Using ThroughputVsLatency on my MBP (2.3 GHz i7, 16GB RAM) with 35k tuples / sec, 4 workers, 4 splitters, 4 counters, 4 spouts, I saw the following metrics:  Avg latency was 14.3ms with master and 14.0ms with the metric enabled, 99% latency was 17.0 and 16.9 respectively, and cpu was 3.05 and 3.09 respectively.  Additional details are available for the asking. 
    



---

[GitHub] storm pull request #2399: Track network data metrics STORM-2793

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2399#discussion_r148330111
  
    --- Diff: storm-client/src/jvm/org/apache/storm/messaging/DeserializingConnectionCallback.java ---
    @@ -17,44 +17,102 @@
      */
     package org.apache.storm.messaging;
     
    +import org.apache.storm.Config;
     import org.apache.storm.daemon.worker.WorkerState;
    +import org.apache.storm.metric.api.IMetric;
    +import org.apache.storm.serialization.KryoTupleDeserializer;
     import org.apache.storm.task.GeneralTopologyContext;
     import org.apache.storm.tuple.AddressedTuple;
    -import org.apache.storm.serialization.KryoTupleDeserializer;
    +import org.apache.storm.tuple.Tuple;
     
     import java.util.ArrayList;
    +import java.util.HashMap;
     import java.util.List;
     import java.util.Map;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.atomic.AtomicLong;
    +
     
     /**
      * A class that is called when a TaskMessage arrives.
      */
    -public class DeserializingConnectionCallback implements IConnectionCallback {
    +public class DeserializingConnectionCallback implements IConnectionCallback, IMetric {
         private final WorkerState.ILocalTransferCallback _cb;
         private final Map _conf;
         private final GeneralTopologyContext _context;
    +
         private final ThreadLocal<KryoTupleDeserializer> _des =
    -         new ThreadLocal<KryoTupleDeserializer>() {
    -             @Override
    -             protected KryoTupleDeserializer initialValue() {
    -                 return new KryoTupleDeserializer(_conf, _context);
    -             }
    -         };
    -
    -    public DeserializingConnectionCallback(final Map<String, Object> conf, final GeneralTopologyContext context, WorkerState.ILocalTransferCallback callback) {
    +        new ThreadLocal<KryoTupleDeserializer>() {
    +            @Override
    +            protected KryoTupleDeserializer initialValue() {
    +                return new KryoTupleDeserializer(_conf, _context);
    +            }
    +        };
    +
    +    // Track serialized size of messages
    +    private final boolean _sizeMetricsEnabled;
    +    private final ConcurrentHashMap<String, AtomicLong> _byteCounts = new ConcurrentHashMap<>();
    +
    +
    +    public DeserializingConnectionCallback(final Map conf, final GeneralTopologyContext context, WorkerState.ILocalTransferCallback callback) {
             _conf = conf;
             _context = context;
             _cb = callback;
    +        _sizeMetricsEnabled = _conf.get(Config.TOPOLOGY_SERIALIZED_MESSAGE_SIZE_METRICS) instanceof Boolean &&
    +                              (Boolean) _conf.get(Config.TOPOLOGY_SERIALIZED_MESSAGE_SIZE_METRICS);
    +
         }
     
         @Override
         public void recv(List<TaskMessage> batch) {
             KryoTupleDeserializer des = _des.get();
             ArrayList<AddressedTuple> ret = new ArrayList<>(batch.size());
             for (TaskMessage message: batch) {
    -            ret.add(new AddressedTuple(message.task(), des.deserialize(message.message())));
    +            Tuple tuple = des.deserialize(message.message());
    +            AddressedTuple addrTuple = new AddressedTuple(message.task(), tuple);
    +            updateMetrics(tuple.getSourceTask(), message);
    +            ret.add(addrTuple);
             }
             _cb.transfer(ret);
         }
     
    +    @Override
    +    public Object getValueAndReset() {
    +        HashMap<String, Long> outMap = new HashMap<>();
    +
    +        if (_sizeMetricsEnabled) { // Possible race conditions
    +            for (Map.Entry<String, AtomicLong> ent : _byteCounts.entrySet()) {
    +                AtomicLong count = ent.getValue();
    +                if (count.get() > 0) {
    +                    outMap.put(ent.getKey(), count.getAndSet(0L));
    +                }
    +            }
    +        }
    +        return outMap;
    +    }
    +
    +    /**
    +     * Update serialized byte counts for each message
    +     * @param sourceTaskId source task
    +     * @param message serialized message
    +     */
    +    protected void updateMetrics(int sourceTaskId, TaskMessage message) {
    +        if (_sizeMetricsEnabled) { // Possible race conditions
    +            int dest = message.task();
    +            int len = message.message().length;
    +            String key = Integer.toString(sourceTaskId) + "-" + Integer.toString(dest);
    +
    +            AtomicLong count = _byteCounts.get(key);
    --- End diff --
    
    In java 8 there is now a computeIfAbsent that would make this a lot simpler.
    
    ```
    byteCounts.computeIfAbsent(key, (key) -> new AtomicLong(0)).addAndGet(len);
    ```


---

[GitHub] storm pull request #2399: Track network data metrics STORM-2793

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2399#discussion_r148328743
  
    --- Diff: storm-client/src/jvm/org/apache/storm/messaging/DeserializingConnectionCallback.java ---
    @@ -17,44 +17,102 @@
      */
     package org.apache.storm.messaging;
     
    +import org.apache.storm.Config;
     import org.apache.storm.daemon.worker.WorkerState;
    +import org.apache.storm.metric.api.IMetric;
    +import org.apache.storm.serialization.KryoTupleDeserializer;
     import org.apache.storm.task.GeneralTopologyContext;
     import org.apache.storm.tuple.AddressedTuple;
    -import org.apache.storm.serialization.KryoTupleDeserializer;
    +import org.apache.storm.tuple.Tuple;
     
     import java.util.ArrayList;
    +import java.util.HashMap;
     import java.util.List;
     import java.util.Map;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.atomic.AtomicLong;
    +
     
     /**
      * A class that is called when a TaskMessage arrives.
      */
    -public class DeserializingConnectionCallback implements IConnectionCallback {
    +public class DeserializingConnectionCallback implements IConnectionCallback, IMetric {
         private final WorkerState.ILocalTransferCallback _cb;
         private final Map _conf;
         private final GeneralTopologyContext _context;
    +
         private final ThreadLocal<KryoTupleDeserializer> _des =
    -         new ThreadLocal<KryoTupleDeserializer>() {
    -             @Override
    -             protected KryoTupleDeserializer initialValue() {
    -                 return new KryoTupleDeserializer(_conf, _context);
    -             }
    -         };
    -
    -    public DeserializingConnectionCallback(final Map<String, Object> conf, final GeneralTopologyContext context, WorkerState.ILocalTransferCallback callback) {
    +        new ThreadLocal<KryoTupleDeserializer>() {
    +            @Override
    +            protected KryoTupleDeserializer initialValue() {
    +                return new KryoTupleDeserializer(_conf, _context);
    +            }
    +        };
    +
    +    // Track serialized size of messages
    +    private final boolean _sizeMetricsEnabled;
    +    private final ConcurrentHashMap<String, AtomicLong> _byteCounts = new ConcurrentHashMap<>();
    +
    +
    +    public DeserializingConnectionCallback(final Map conf, final GeneralTopologyContext context, WorkerState.ILocalTransferCallback callback) {
             _conf = conf;
             _context = context;
             _cb = callback;
    +        _sizeMetricsEnabled = _conf.get(Config.TOPOLOGY_SERIALIZED_MESSAGE_SIZE_METRICS) instanceof Boolean &&
    +                              (Boolean) _conf.get(Config.TOPOLOGY_SERIALIZED_MESSAGE_SIZE_METRICS);
    +
         }
     
         @Override
         public void recv(List<TaskMessage> batch) {
             KryoTupleDeserializer des = _des.get();
             ArrayList<AddressedTuple> ret = new ArrayList<>(batch.size());
             for (TaskMessage message: batch) {
    -            ret.add(new AddressedTuple(message.task(), des.deserialize(message.message())));
    +            Tuple tuple = des.deserialize(message.message());
    +            AddressedTuple addrTuple = new AddressedTuple(message.task(), tuple);
    +            updateMetrics(tuple.getSourceTask(), message);
    +            ret.add(addrTuple);
             }
             _cb.transfer(ret);
         }
     
    +    @Override
    +    public Object getValueAndReset() {
    +        HashMap<String, Long> outMap = new HashMap<>();
    +
    +        if (_sizeMetricsEnabled) { // Possible race conditions
    --- End diff --
    
    Where is the race and what are we doing to mitigate it?  The comment is a bit confusing.


---

[GitHub] storm pull request #2399: Track network data metrics STORM-2793

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2399#discussion_r148330213
  
    --- Diff: storm-client/src/jvm/org/apache/storm/messaging/netty/Server.java ---
    @@ -249,6 +250,15 @@ public Object getState() {
                 }
             }
             ret.put("enqueued", enqueued);
    +        
    +        // Report messageSizes metric
    +        if (_cb instanceof IMetric) {
    +            Object metrics = ((IMetric) _cb).getValueAndReset();
    +            if(metrics instanceof Map && !((Map) metrics).isEmpty()) {
    --- End diff --
    
    nit: space after the if


---

[GitHub] storm issue #2399: STORM-2793 Track network data metrics

Posted by jmartell7 <gi...@git.apache.org>.
Github user jmartell7 commented on the issue:

    https://github.com/apache/storm/pull/2399
  
    @HeartSaVioR , I've updated the title.  The overhead of serializing the local tuples to measure the size didn't seem worth it, and local tuples generally become a smaller fraction of the total as the number of workers increases.  
    
    I'm working on performance numbers using ThroughputVsLatency, but am open to other ideas. 


---

[GitHub] storm pull request #2399: Track network data metrics STORM-2793

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2399#discussion_r148329501
  
    --- Diff: storm-client/src/jvm/org/apache/storm/messaging/DeserializingConnectionCallback.java ---
    @@ -17,44 +17,102 @@
      */
     package org.apache.storm.messaging;
     
    +import org.apache.storm.Config;
     import org.apache.storm.daemon.worker.WorkerState;
    +import org.apache.storm.metric.api.IMetric;
    +import org.apache.storm.serialization.KryoTupleDeserializer;
     import org.apache.storm.task.GeneralTopologyContext;
     import org.apache.storm.tuple.AddressedTuple;
    -import org.apache.storm.serialization.KryoTupleDeserializer;
    +import org.apache.storm.tuple.Tuple;
     
     import java.util.ArrayList;
    +import java.util.HashMap;
     import java.util.List;
     import java.util.Map;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.atomic.AtomicLong;
    +
     
     /**
      * A class that is called when a TaskMessage arrives.
      */
    -public class DeserializingConnectionCallback implements IConnectionCallback {
    +public class DeserializingConnectionCallback implements IConnectionCallback, IMetric {
         private final WorkerState.ILocalTransferCallback _cb;
         private final Map _conf;
         private final GeneralTopologyContext _context;
    +
         private final ThreadLocal<KryoTupleDeserializer> _des =
    -         new ThreadLocal<KryoTupleDeserializer>() {
    -             @Override
    -             protected KryoTupleDeserializer initialValue() {
    -                 return new KryoTupleDeserializer(_conf, _context);
    -             }
    -         };
    -
    -    public DeserializingConnectionCallback(final Map<String, Object> conf, final GeneralTopologyContext context, WorkerState.ILocalTransferCallback callback) {
    +        new ThreadLocal<KryoTupleDeserializer>() {
    +            @Override
    +            protected KryoTupleDeserializer initialValue() {
    +                return new KryoTupleDeserializer(_conf, _context);
    +            }
    +        };
    +
    +    // Track serialized size of messages
    +    private final boolean _sizeMetricsEnabled;
    +    private final ConcurrentHashMap<String, AtomicLong> _byteCounts = new ConcurrentHashMap<>();
    +
    +
    +    public DeserializingConnectionCallback(final Map conf, final GeneralTopologyContext context, WorkerState.ILocalTransferCallback callback) {
             _conf = conf;
             _context = context;
             _cb = callback;
    +        _sizeMetricsEnabled = _conf.get(Config.TOPOLOGY_SERIALIZED_MESSAGE_SIZE_METRICS) instanceof Boolean &&
    +                              (Boolean) _conf.get(Config.TOPOLOGY_SERIALIZED_MESSAGE_SIZE_METRICS);
    +
         }
     
         @Override
         public void recv(List<TaskMessage> batch) {
             KryoTupleDeserializer des = _des.get();
             ArrayList<AddressedTuple> ret = new ArrayList<>(batch.size());
             for (TaskMessage message: batch) {
    -            ret.add(new AddressedTuple(message.task(), des.deserialize(message.message())));
    +            Tuple tuple = des.deserialize(message.message());
    +            AddressedTuple addrTuple = new AddressedTuple(message.task(), tuple);
    +            updateMetrics(tuple.getSourceTask(), message);
    +            ret.add(addrTuple);
             }
             _cb.transfer(ret);
         }
     
    +    @Override
    +    public Object getValueAndReset() {
    +        HashMap<String, Long> outMap = new HashMap<>();
    --- End diff --
    
    Can we return a null if metrics are not enabled instead of an empty map?  A null means the metric will not be sent to the metrics collector, but an empty map might mean that the metrics are disabled or that there was no activity at all.


---

[GitHub] storm pull request #2399: Track network data metrics STORM-2793

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2399#discussion_r148328186
  
    --- Diff: storm-client/src/jvm/org/apache/storm/messaging/DeserializingConnectionCallback.java ---
    @@ -17,44 +17,102 @@
      */
     package org.apache.storm.messaging;
     
    +import org.apache.storm.Config;
     import org.apache.storm.daemon.worker.WorkerState;
    +import org.apache.storm.metric.api.IMetric;
    +import org.apache.storm.serialization.KryoTupleDeserializer;
     import org.apache.storm.task.GeneralTopologyContext;
     import org.apache.storm.tuple.AddressedTuple;
    -import org.apache.storm.serialization.KryoTupleDeserializer;
    +import org.apache.storm.tuple.Tuple;
     
     import java.util.ArrayList;
    +import java.util.HashMap;
     import java.util.List;
     import java.util.Map;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.atomic.AtomicLong;
    +
     
     /**
      * A class that is called when a TaskMessage arrives.
      */
    -public class DeserializingConnectionCallback implements IConnectionCallback {
    +public class DeserializingConnectionCallback implements IConnectionCallback, IMetric {
         private final WorkerState.ILocalTransferCallback _cb;
         private final Map _conf;
         private final GeneralTopologyContext _context;
    +
         private final ThreadLocal<KryoTupleDeserializer> _des =
    -         new ThreadLocal<KryoTupleDeserializer>() {
    -             @Override
    -             protected KryoTupleDeserializer initialValue() {
    -                 return new KryoTupleDeserializer(_conf, _context);
    -             }
    -         };
    -
    -    public DeserializingConnectionCallback(final Map<String, Object> conf, final GeneralTopologyContext context, WorkerState.ILocalTransferCallback callback) {
    +        new ThreadLocal<KryoTupleDeserializer>() {
    +            @Override
    +            protected KryoTupleDeserializer initialValue() {
    +                return new KryoTupleDeserializer(_conf, _context);
    +            }
    +        };
    +
    +    // Track serialized size of messages
    +    private final boolean _sizeMetricsEnabled;
    +    private final ConcurrentHashMap<String, AtomicLong> _byteCounts = new ConcurrentHashMap<>();
    +
    +
    +    public DeserializingConnectionCallback(final Map conf, final GeneralTopologyContext context, WorkerState.ILocalTransferCallback callback) {
             _conf = conf;
             _context = context;
             _cb = callback;
    +        _sizeMetricsEnabled = _conf.get(Config.TOPOLOGY_SERIALIZED_MESSAGE_SIZE_METRICS) instanceof Boolean &&
    --- End diff --
    
    It might be simpler to use.
    
    ```
    sizeMetricsEnabled = ObjectReader.getBoolean(_conf.get(Config.TOPOLOGY_SERIALIZED_MESSAGE_SIZE_METRICS), false);
    ```


---

[GitHub] storm issue #2399: Track network data metrics STORM-2793

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/storm/pull/2399
  
    And please change the title to 'STORM-2793 Track network data metrics' to conform to https://github.com/apache/storm/blob/master/DEVELOPER.md#create-a-pull-request
    



---

[GitHub] storm pull request #2399: Track network data metrics STORM-2793

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2399#discussion_r148328987
  
    --- Diff: storm-client/src/jvm/org/apache/storm/messaging/DeserializingConnectionCallback.java ---
    @@ -17,44 +17,102 @@
      */
     package org.apache.storm.messaging;
     
    +import org.apache.storm.Config;
     import org.apache.storm.daemon.worker.WorkerState;
    +import org.apache.storm.metric.api.IMetric;
    +import org.apache.storm.serialization.KryoTupleDeserializer;
     import org.apache.storm.task.GeneralTopologyContext;
     import org.apache.storm.tuple.AddressedTuple;
    -import org.apache.storm.serialization.KryoTupleDeserializer;
    +import org.apache.storm.tuple.Tuple;
     
     import java.util.ArrayList;
    +import java.util.HashMap;
     import java.util.List;
     import java.util.Map;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.atomic.AtomicLong;
    +
     
     /**
      * A class that is called when a TaskMessage arrives.
      */
    -public class DeserializingConnectionCallback implements IConnectionCallback {
    +public class DeserializingConnectionCallback implements IConnectionCallback, IMetric {
         private final WorkerState.ILocalTransferCallback _cb;
         private final Map _conf;
         private final GeneralTopologyContext _context;
    +
         private final ThreadLocal<KryoTupleDeserializer> _des =
    -         new ThreadLocal<KryoTupleDeserializer>() {
    -             @Override
    -             protected KryoTupleDeserializer initialValue() {
    -                 return new KryoTupleDeserializer(_conf, _context);
    -             }
    -         };
    -
    -    public DeserializingConnectionCallback(final Map<String, Object> conf, final GeneralTopologyContext context, WorkerState.ILocalTransferCallback callback) {
    +        new ThreadLocal<KryoTupleDeserializer>() {
    +            @Override
    +            protected KryoTupleDeserializer initialValue() {
    +                return new KryoTupleDeserializer(_conf, _context);
    +            }
    +        };
    +
    +    // Track serialized size of messages
    +    private final boolean _sizeMetricsEnabled;
    +    private final ConcurrentHashMap<String, AtomicLong> _byteCounts = new ConcurrentHashMap<>();
    +
    +
    +    public DeserializingConnectionCallback(final Map conf, final GeneralTopologyContext context, WorkerState.ILocalTransferCallback callback) {
             _conf = conf;
             _context = context;
             _cb = callback;
    +        _sizeMetricsEnabled = _conf.get(Config.TOPOLOGY_SERIALIZED_MESSAGE_SIZE_METRICS) instanceof Boolean &&
    +                              (Boolean) _conf.get(Config.TOPOLOGY_SERIALIZED_MESSAGE_SIZE_METRICS);
    +
         }
     
         @Override
         public void recv(List<TaskMessage> batch) {
             KryoTupleDeserializer des = _des.get();
             ArrayList<AddressedTuple> ret = new ArrayList<>(batch.size());
             for (TaskMessage message: batch) {
    -            ret.add(new AddressedTuple(message.task(), des.deserialize(message.message())));
    +            Tuple tuple = des.deserialize(message.message());
    +            AddressedTuple addrTuple = new AddressedTuple(message.task(), tuple);
    +            updateMetrics(tuple.getSourceTask(), message);
    +            ret.add(addrTuple);
             }
             _cb.transfer(ret);
         }
     
    +    @Override
    +    public Object getValueAndReset() {
    +        HashMap<String, Long> outMap = new HashMap<>();
    +
    +        if (_sizeMetricsEnabled) { // Possible race conditions
    +            for (Map.Entry<String, AtomicLong> ent : _byteCounts.entrySet()) {
    +                AtomicLong count = ent.getValue();
    +                if (count.get() > 0) {
    +                    outMap.put(ent.getKey(), count.getAndSet(0L));
    +                }
    +            }
    +        }
    +        return outMap;
    +    }
    +
    +    /**
    +     * Update serialized byte counts for each message
    --- End diff --
    
    nit: please put a '.' at the end of the first sentence in the javadoc.


---