You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by dossett <gi...@git.apache.org> on 2015/07/23 14:51:57 UTC

[GitHub] storm pull request: STORM-960 HiveBolt should ack tuples only afte...

GitHub user dossett opened a pull request:

    https://github.com/apache/storm/pull/653

    STORM-960 HiveBolt should ack tuples only after flushing

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/dossett/storm HiveBoltAck

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/653.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #653
    
----
commit aac496ade94210888a0ca473df648855c408496b
Author: Sriharsha Chintalapani <ma...@harsha.io>
Date:   2015-07-21T22:42:22Z

    STORM-951. Storm Hive connector leaking connections.

commit 6a5d0dcb44b58063035bfdaac4ebddba401dc914
Author: Aaron Dossett <aa...@target.com>
Date:   2015-07-23T12:50:25Z

    STORM 960: HiveBolt should only ack after succesful flush

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-960 HiveBolt should ack tuples only afte...

Posted by ptgoetz <gi...@git.apache.org>.
Github user ptgoetz commented on the pull request:

    https://github.com/apache/storm/pull/653#issuecomment-125960027
  
    +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-960 HiveBolt should ack tuples only afte...

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on the pull request:

    https://github.com/apache/storm/pull/653#issuecomment-126338285
  
    +1 @dossett can you up merge the changes with the branch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-960 HiveBolt should ack tuples only afte...

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on a diff in the pull request:

    https://github.com/apache/storm/pull/653#discussion_r35761875
  
    --- Diff: external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java ---
    @@ -104,16 +106,30 @@ public void execute(Tuple tuple) {
                     enableHeartBeatOnAllWriters();
                 }
                 writer.write(options.getMapper().mapRecord(tuple));
    -            currentBatchSize++;
    -            if(currentBatchSize >= options.getBatchSize()) {
    +
    +            tupleBatch.add(tuple);
    +            if(tupleBatch.size() >= options.getBatchSize()) {
                     flushAllWriters();
    -                currentBatchSize = 0;
    +                LOG.info("acknowledging tuples after writers flushed ");
    +                for(Tuple t : tupleBatch)
    +                    collector.ack(t);
    +                tupleBatch.clear();
                 }
    -            collector.ack(tuple);
    +
             } catch(Exception e) {
                 this.collector.reportError(e);
                 collector.fail(tuple);
    -            flushAndCloseWriters();
    +            try {
    +                flushAndCloseWriters();
    +                LOG.info("acknowledging tuples after writers flushed and closed");
    +                for (Tuple t : tupleBatch)
    +                    collector.ack(t);
    --- End diff --
    
    shouldn't we calling collector.fail here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-960 HiveBolt should ack tuples only afte...

Posted by dossett <gi...@git.apache.org>.
Github user dossett commented on the pull request:

    https://github.com/apache/storm/pull/653#issuecomment-126317166
  
    Committed changes per feedback from @harshach 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-960 HiveBolt should ack tuples only afte...

Posted by dossett <gi...@git.apache.org>.
Github user dossett commented on a diff in the pull request:

    https://github.com/apache/storm/pull/653#discussion_r35777256
  
    --- Diff: external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java ---
    @@ -104,16 +106,30 @@ public void execute(Tuple tuple) {
                     enableHeartBeatOnAllWriters();
                 }
                 writer.write(options.getMapper().mapRecord(tuple));
    -            currentBatchSize++;
    -            if(currentBatchSize >= options.getBatchSize()) {
    +
    +            tupleBatch.add(tuple);
    +            if(tupleBatch.size() >= options.getBatchSize()) {
                     flushAllWriters();
    -                currentBatchSize = 0;
    +                LOG.info("acknowledging tuples after writers flushed ");
    +                for(Tuple t : tupleBatch)
    +                    collector.ack(t);
    +                tupleBatch.clear();
                 }
    -            collector.ack(tuple);
    +
             } catch(Exception e) {
                 this.collector.reportError(e);
                 collector.fail(tuple);
    -            flushAndCloseWriters();
    +            try {
    +                flushAndCloseWriters();
    +                LOG.info("acknowledging tuples after writers flushed and closed");
    +                for (Tuple t : tupleBatch)
    +                    collector.ack(t);
    +                tupleBatch.clear();
    +            } catch (Exception e1) {
    +                //If flushAndClose fails assume tuples are lost, do not ack
    +                LOG.warn("Error while flushing and closing writers, tuples will NOT be acknowledged");
    --- End diff --
    
    Great point, agreed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-960 HiveBolt should ack tuples only afte...

Posted by dossett <gi...@git.apache.org>.
Github user dossett commented on the pull request:

    https://github.com/apache/storm/pull/653#issuecomment-126379477
  
    PR rebased and squashed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-960 HiveBolt should ack tuples only afte...

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on the pull request:

    https://github.com/apache/storm/pull/653#issuecomment-126708790
  
    @dossett Thanks for the patch. Pushed to master.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-960 HiveBolt should ack tuples only afte...

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on a diff in the pull request:

    https://github.com/apache/storm/pull/653#discussion_r35761939
  
    --- Diff: external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java ---
    @@ -43,23 +43,25 @@
     import java.util.concurrent.Executors;
     import java.util.concurrent.TimeUnit;
     import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.concurrent.BlockingQueue;
    +import java.util.concurrent.LinkedBlockingQueue;
     import java.io.IOException;
     
     public class HiveBolt extends  BaseRichBolt {
         private static final Logger LOG = LoggerFactory.getLogger(HiveBolt.class);
         private OutputCollector collector;
         private HiveOptions options;
    -    private Integer currentBatchSize;
         private ExecutorService callTimeoutPool;
         private transient Timer heartBeatTimer;
         private Boolean kerberosEnabled = false;
         private AtomicBoolean timeToSendHeartBeat = new AtomicBoolean(false);
         private UserGroupInformation ugi = null;
         HashMap<HiveEndPoint, HiveWriter> allWriters;
    +    private BlockingQueue<Tuple> tupleBatch;
     
         public HiveBolt(HiveOptions options) {
             this.options = options;
    -        this.currentBatchSize = 0;
    +        tupleBatch = new LinkedBlockingQueue<Tuple>();
    --- End diff --
    
    any reason for LinkedBlockingQueue instead of a List


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-960 HiveBolt should ack tuples only afte...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/storm/pull/653


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-960 HiveBolt should ack tuples only afte...

Posted by dossett <gi...@git.apache.org>.
Github user dossett commented on a diff in the pull request:

    https://github.com/apache/storm/pull/653#discussion_r35777089
  
    --- Diff: external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java ---
    @@ -43,23 +43,25 @@
     import java.util.concurrent.Executors;
     import java.util.concurrent.TimeUnit;
     import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.concurrent.BlockingQueue;
    +import java.util.concurrent.LinkedBlockingQueue;
     import java.io.IOException;
     
     public class HiveBolt extends  BaseRichBolt {
         private static final Logger LOG = LoggerFactory.getLogger(HiveBolt.class);
         private OutputCollector collector;
         private HiveOptions options;
    -    private Integer currentBatchSize;
         private ExecutorService callTimeoutPool;
         private transient Timer heartBeatTimer;
         private Boolean kerberosEnabled = false;
         private AtomicBoolean timeToSendHeartBeat = new AtomicBoolean(false);
         private UserGroupInformation ugi = null;
         HashMap<HiveEndPoint, HiveWriter> allWriters;
    +    private BlockingQueue<Tuple> tupleBatch;
     
         public HiveBolt(HiveOptions options) {
             this.options = options;
    -        this.currentBatchSize = 0;
    +        tupleBatch = new LinkedBlockingQueue<Tuple>();
    --- End diff --
    
    I was trying to guard against any concurrency issues with that data structure.  I still have not mastered how storm components parallelize, so maybe this isn't needed?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-960 HiveBolt should ack tuples only afte...

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on a diff in the pull request:

    https://github.com/apache/storm/pull/653#discussion_r35762109
  
    --- Diff: external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java ---
    @@ -104,16 +106,30 @@ public void execute(Tuple tuple) {
                     enableHeartBeatOnAllWriters();
                 }
                 writer.write(options.getMapper().mapRecord(tuple));
    -            currentBatchSize++;
    -            if(currentBatchSize >= options.getBatchSize()) {
    +
    +            tupleBatch.add(tuple);
    +            if(tupleBatch.size() >= options.getBatchSize()) {
                     flushAllWriters();
    -                currentBatchSize = 0;
    +                LOG.info("acknowledging tuples after writers flushed ");
    +                for(Tuple t : tupleBatch)
    +                    collector.ack(t);
    +                tupleBatch.clear();
                 }
    -            collector.ack(tuple);
    +
             } catch(Exception e) {
                 this.collector.reportError(e);
                 collector.fail(tuple);
    -            flushAndCloseWriters();
    +            try {
    +                flushAndCloseWriters();
    +                LOG.info("acknowledging tuples after writers flushed and closed");
    +                for (Tuple t : tupleBatch)
    +                    collector.ack(t);
    +                tupleBatch.clear();
    +            } catch (Exception e1) {
    +                //If flushAndClose fails assume tuples are lost, do not ack
    +                LOG.warn("Error while flushing and closing writers, tuples will NOT be acknowledged");
    --- End diff --
    
    I am not quite sure but isn't it beneficial to calling fail instead of letting them timeout.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-960 HiveBolt should ack tuples only afte...

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on a diff in the pull request:

    https://github.com/apache/storm/pull/653#discussion_r35793063
  
    --- Diff: external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java ---
    @@ -43,23 +43,25 @@
     import java.util.concurrent.Executors;
     import java.util.concurrent.TimeUnit;
     import java.util.concurrent.atomic.AtomicBoolean;
    +import java.util.concurrent.BlockingQueue;
    +import java.util.concurrent.LinkedBlockingQueue;
     import java.io.IOException;
     
     public class HiveBolt extends  BaseRichBolt {
         private static final Logger LOG = LoggerFactory.getLogger(HiveBolt.class);
         private OutputCollector collector;
         private HiveOptions options;
    -    private Integer currentBatchSize;
         private ExecutorService callTimeoutPool;
         private transient Timer heartBeatTimer;
         private Boolean kerberosEnabled = false;
         private AtomicBoolean timeToSendHeartBeat = new AtomicBoolean(false);
         private UserGroupInformation ugi = null;
         HashMap<HiveEndPoint, HiveWriter> allWriters;
    +    private BlockingQueue<Tuple> tupleBatch;
     
         public HiveBolt(HiveOptions options) {
             this.options = options;
    -        this.currentBatchSize = 0;
    +        tupleBatch = new LinkedBlockingQueue<Tuple>();
    --- End diff --
    
    Bolt internals are not shared across the threads you can safely use List


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---