You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by omkreddy <gi...@git.apache.org> on 2017/09/05 05:50:00 UTC

[GitHub] storm pull request #2308: MINOR: Update DruidBeamBolt logs

GitHub user omkreddy opened a pull request:

    https://github.com/apache/storm/pull/2308

    MINOR: Update DruidBeamBolt logs

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/omkreddy/storm druidlog

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/2308.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2308
    
----
commit 8adef5fbcd243aeea055400bed5efbc977b097e3
Author: Manikumar Reddy O <ma...@gmail.com>
Date:   2017-08-29T08:31:00Z

    MINOR: Update DruidBeamBolt logs

----


---

[GitHub] storm issue #2308: MINOR: Update DruidBeamBolt logs

Posted by omkreddy <gi...@git.apache.org>.
Github user omkreddy commented on the issue:

    https://github.com/apache/storm/pull/2308
  
    @revans2 Thanks for the review. Druid client drops the messages in certain scenarios like invalid timestamp, data source etc..  These changes are mainly to debug these dropped messages. 
    
    1. We are transforming/updating the incoming tuple. Printing updated event useful for debugging. 
    2. print the exception message for MessageDroppedExceptions


---

[GitHub] storm pull request #2308: MINOR: Update DruidBeamBolt logs

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2308#discussion_r136990714
  
    --- Diff: external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidBeamBolt.java ---
    @@ -78,27 +76,28 @@ public void prepare(Map stormConf, TopologyContext context, OutputCollector coll
     
         @Override
         protected void process(final Tuple tuple) {
    -        Future future = tranquilizer.send((druidEventMapper.getEvent(tuple)));
    -        LOG.debug("Sent tuple : [{}]", tuple);
    +        final E mappedEvent = druidEventMapper.getEvent(tuple);
    +        Future future = tranquilizer.send(mappedEvent);
    +        LOG.debug("Sent tuple : [{}]", mappedEvent);
     
             future.addEventListener(new FutureEventListener() {
                 @Override
                 public void onFailure(Throwable cause) {
                     if (cause instanceof MessageDroppedException) {
                         collector.ack(tuple);
    -                    LOG.debug("Tuple Dropped due to MessageDroppedException : [{}]", tuple);
    +                    LOG.debug("Tuple Dropped due to MessageDroppedException : [{}]", mappedEvent, cause);
    --- End diff --
    
    nit: I am assuming that you are adding the cause here because you want to debug it more.  Do you really need the full stack trace?  I don't see that changing between different times it happened.  Could you instead set it to something like the following?
    
    ```
    LOG.debug("Tuple Dropped due to MessageDroppedException {} : [{}]", cause.getMessage(), mappedEvent);
    ```


---

[GitHub] storm pull request #2308: MINOR: Update DruidBeamBolt logs

Posted by omkreddy <gi...@git.apache.org>.
Github user omkreddy commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2308#discussion_r137043877
  
    --- Diff: external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidBeamBolt.java ---
    @@ -78,27 +76,28 @@ public void prepare(Map stormConf, TopologyContext context, OutputCollector coll
     
         @Override
         protected void process(final Tuple tuple) {
    -        Future future = tranquilizer.send((druidEventMapper.getEvent(tuple)));
    -        LOG.debug("Sent tuple : [{}]", tuple);
    +        final E mappedEvent = druidEventMapper.getEvent(tuple);
    +        Future future = tranquilizer.send(mappedEvent);
    +        LOG.debug("Sent tuple : [{}]", mappedEvent);
     
             future.addEventListener(new FutureEventListener() {
                 @Override
                 public void onFailure(Throwable cause) {
                     if (cause instanceof MessageDroppedException) {
                         collector.ack(tuple);
    -                    LOG.debug("Tuple Dropped due to MessageDroppedException : [{}]", tuple);
    +                    LOG.debug("Tuple Dropped due to MessageDroppedException : [{}]", mappedEvent, cause);
    --- End diff --
    
    exception message should be sufficeint. Updated the PR.


---

[GitHub] storm issue #2308: MINOR: Update DruidBeamBolt logs

Posted by omkreddy <gi...@git.apache.org>.
Github user omkreddy commented on the issue:

    https://github.com/apache/storm/pull/2308
  
    @arunmahadevan  Pls review.


---

[GitHub] storm pull request #2308: MINOR: Update DruidBeamBolt logs

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/storm/pull/2308


---

[GitHub] storm pull request #2308: MINOR: Update DruidBeamBolt logs

Posted by omkreddy <gi...@git.apache.org>.
Github user omkreddy commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2308#discussion_r137044069
  
    --- Diff: external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidBeamBolt.java ---
    @@ -78,27 +76,28 @@ public void prepare(Map stormConf, TopologyContext context, OutputCollector coll
     
         @Override
         protected void process(final Tuple tuple) {
    -        Future future = tranquilizer.send((druidEventMapper.getEvent(tuple)));
    -        LOG.debug("Sent tuple : [{}]", tuple);
    +        final E mappedEvent = druidEventMapper.getEvent(tuple);
    +        Future future = tranquilizer.send(mappedEvent);
    +        LOG.debug("Sent tuple : [{}]", mappedEvent);
     
             future.addEventListener(new FutureEventListener() {
                 @Override
                 public void onFailure(Throwable cause) {
                     if (cause instanceof MessageDroppedException) {
                         collector.ack(tuple);
    -                    LOG.debug("Tuple Dropped due to MessageDroppedException : [{}]", tuple);
    +                    LOG.debug("Tuple Dropped due to MessageDroppedException : [{}]", mappedEvent, cause);
                         if (druidConfig.getDiscardStreamId() != null)
                             collector.emit(druidConfig.getDiscardStreamId(), new Values(tuple, System.currentTimeMillis()));
                     } else {
                         collector.fail(tuple);
    -                    LOG.debug("Tuple Processing Failed : [{}]", tuple);
    +                    LOG.debug("Tuple Processing Failed : [{}]", mappedEvent, cause);
    --- End diff --
    
    error level more suitable here. Updated the PR.


---

[GitHub] storm issue #2308: MINOR: Update DruidBeamBolt logs

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on the issue:

    https://github.com/apache/storm/pull/2308
  
    +1


---

[GitHub] storm pull request #2308: MINOR: Update DruidBeamBolt logs

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2308#discussion_r136989219
  
    --- Diff: external/storm-druid/src/main/java/org/apache/storm/druid/bolt/DruidBeamBolt.java ---
    @@ -78,27 +76,28 @@ public void prepare(Map stormConf, TopologyContext context, OutputCollector coll
     
         @Override
         protected void process(final Tuple tuple) {
    -        Future future = tranquilizer.send((druidEventMapper.getEvent(tuple)));
    -        LOG.debug("Sent tuple : [{}]", tuple);
    +        final E mappedEvent = druidEventMapper.getEvent(tuple);
    +        Future future = tranquilizer.send(mappedEvent);
    +        LOG.debug("Sent tuple : [{}]", mappedEvent);
     
             future.addEventListener(new FutureEventListener() {
                 @Override
                 public void onFailure(Throwable cause) {
                     if (cause instanceof MessageDroppedException) {
                         collector.ack(tuple);
    -                    LOG.debug("Tuple Dropped due to MessageDroppedException : [{}]", tuple);
    +                    LOG.debug("Tuple Dropped due to MessageDroppedException : [{}]", mappedEvent, cause);
                         if (druidConfig.getDiscardStreamId() != null)
                             collector.emit(druidConfig.getDiscardStreamId(), new Values(tuple, System.currentTimeMillis()));
                     } else {
                         collector.fail(tuple);
    -                    LOG.debug("Tuple Processing Failed : [{}]", tuple);
    +                    LOG.debug("Tuple Processing Failed : [{}]", mappedEvent, cause);
    --- End diff --
    
    nit: why is a tuple failing a debug message?


---

[GitHub] storm issue #2308: MINOR: Update DruidBeamBolt logs

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the issue:

    https://github.com/apache/storm/pull/2308
  
    @omkreddy I merged this into master and 1.x-branch


---