You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by dossett <gi...@git.apache.org> on 2015/09/29 04:35:50 UTC

[GitHub] storm pull request: STORM-1073: Refactor AbstractHdfsBolt

GitHub user dossett opened a pull request:

    https://github.com/apache/storm/pull/767

    STORM-1073: Refactor AbstractHdfsBolt

    STORM-969 changed the execution of HdfsBolt to be more robust to errors.  This change moves that logic up to AbstractHdfsBolt and adds two new abstract methods to AbstractHdfsBolt: writeTuple and syncTuples.  In other words AbstractHdfsBolt contains all of the error handling and retry logic for writing to HDFS, and its implementing classes will specify how to write specific content to HDFS.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/dossett/storm HdfsBolt

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/767.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #767
    
----
commit 375d70a4134ba93be9ff8040334d63551542ca78
Author: Aaron Dossett <aa...@target.com>
Date:   2015-09-29T02:30:14Z

    STORM-1073: Refactor AbstractHdfsBolt

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1073: Refactor AbstractHdfsBolt

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/767#discussion_r42546136
  
    --- Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java ---
    @@ -127,9 +145,112 @@ public void run() {
         }
     
         @Override
    +    public final void execute(Tuple tuple) {
    +
    +        synchronized (this.writeLock) {
    +            boolean forceSync = false;
    +            if (TupleUtils.isTick(tuple)) {
    +                LOG.debug("TICK! forcing a file system flush");
    +                forceSync = true;
    +            } else {
    +                try {
    +                    writeTuple(tuple);
    +                    tupleBatch.add(tuple);
    +                } catch (IOException e) {
    +                    //If the write failed, try to sync anything already written
    +                    LOG.info("Tuple failed to write, forcing a flush of existing data.");
    +                    this.collector.reportError(e);
    +                    forceSync = true;
    +                    this.collector.fail(tuple);
    +                }
    +            }
    +
    +            if (this.syncPolicy.mark(tuple, this.offset) || (forceSync && tupleBatch.size() > 0)) {
    +                int attempts = 0;
    +                boolean success = false;
    +                IOException lastException = null;
    +                // Make every attempt to sync the data we have.  If it can't be done then kill the bolt with
    +                // a runtime exception.  The filesystem is presumably in a very bad state.
    +                while (success == false && attempts < fileRetryCount) {
    +                    attempts += 1;
    +                    try {
    +                        syncTuples();
    +                        LOG.debug("Data synced to filesystem. Ack'ing [{}] tuples", tupleBatch.size());
    +                        for (Tuple t : tupleBatch) {
    +                            this.collector.ack(t);
    +                        }
    +                        tupleBatch.clear();
    +                        syncPolicy.reset();
    +                        success = true;
    +                    } catch (IOException e) {
    +                        LOG.warn("Data could not be synced to filesystem on attempt [" + attempts + "]");
    --- End diff --
    
    @dossett as @revans2 brought up, I think its better to use the curly brace {} anchor for logging throughout to be consistent.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1073: Refactor AbstractHdfsBolt

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/767#discussion_r42497270
  
    --- Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java ---
    @@ -99,6 +109,13 @@ public final void prepare(Map conf, TopologyContext topologyContext, OutputColle
                 }
             }
     
    +        // If interval is non-zero then it has already been explicitly set and we should not default it
    +        if (conf.containsKey("topology.message.timeout.secs") && tickTupleInterval == 0)
    +        {
    +            Integer topologyTimeout = Integer.parseInt(conf.get("topology.message.timeout.secs").toString());
    --- End diff --
    
    Can we please use `Utils.getInt(conf.get(Conf.TOPOLOGY_MESSAGE_TIMEOUT_SECS))` instead. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1073: Refactor AbstractHdfsBolt

Posted by dossett <gi...@git.apache.org>.
Github user dossett commented on the pull request:

    https://github.com/apache/storm/pull/767#issuecomment-149346069
  
    @arunmahadevan @harshach Do either of you have additional feedback on this PR?  Thank you!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1073: Refactor AbstractHdfsBolt

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/767#discussion_r41111274
  
    --- Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileBolt.java ---
    @@ -89,6 +89,11 @@ public SequenceFileBolt withCompressionType(SequenceFile.CompressionType compres
             return this;
         }
     
    +    public SequenceFileBolt withTickTupleIntervalSeconds(int interval) {
    +        this.tickTupleInterval = interval;
    +        return this;
    +    }
    +
    --- End diff --
    
    Add option to set retry count as well in SequnceFileBolt.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1073: Refactor AbstractHdfsBolt

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/767#discussion_r42497567
  
    --- Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java ---
    @@ -127,9 +144,110 @@ public void run() {
         }
     
         @Override
    +    public final void execute(Tuple tuple) {
    +
    +        synchronized (this.writeLock) {
    +            boolean forceSync = false;
    +            if (TupleUtils.isTick(tuple)) {
    +                LOG.debug("TICK! forcing a file system flush");
    +                forceSync = true;
    +            } else {
    +                try {
    +                    writeTuple(tuple);
    +                    tupleBatch.add(tuple);
    +                } catch (IOException e) {
    +                    //If the write failed, try to sync anything already written
    +                    LOG.info("Tuple failed to write, forcing a flush of existing data.");
    +                    this.collector.reportError(e);
    +                    forceSync = true;
    +                    this.collector.fail(tuple);
    +                }
    +            }
    +
    +            if (this.syncPolicy.mark(tuple, this.offset) || (forceSync && tupleBatch.size() > 0)) {
    +                int attempts = 0;
    +                boolean success = false;
    +                IOException lastException = null;
    +                // Make every attempt to sync the data we have.  If it can't be done then kill the bolt with
    +                // a runtime exception.  The filesystem is presumably in a very bad state.
    +                while (success == false && attempts < fileRetryCount) {
    +                    attempts += 1;
    +                    try {
    +                        syncTuples();
    +                        LOG.debug("Data synced to filesystem. Ack'ing [" + tupleBatch.size() + "] tuples");
    --- End diff --
    
    Please use the slf4j logging replacement API especially for debug logs.
    
    ```LOG.debug("Data synced to filesystem. Ack'ing [{}] tuples",  tupleBatch.size());```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1073: Refactor AbstractHdfsBolt

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on the pull request:

    https://github.com/apache/storm/pull/767#issuecomment-149425136
  
    looks good to me


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1073: Refactor AbstractHdfsBolt

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/storm/pull/767


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1073: Refactor AbstractHdfsBolt

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/767#discussion_r41111208
  
    --- Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java ---
    @@ -34,13 +37,11 @@
     import org.slf4j.LoggerFactory;
     
     import java.io.IOException;
    -import java.util.ArrayList;
    -import java.util.Map;
    -import java.util.Timer;
    -import java.util.TimerTask;
    +import java.util.*;
    --- End diff --
    
    Better not to use wildcard imports.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1073: Refactor AbstractHdfsBolt

Posted by dossett <gi...@git.apache.org>.
Github user dossett commented on the pull request:

    https://github.com/apache/storm/pull/767#issuecomment-145528207
  
    Thank you for catching those, @arunmahadevan.  PR updated per your feedback.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1073: Refactor AbstractHdfsBolt

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on the pull request:

    https://github.com/apache/storm/pull/767#issuecomment-145433935
  
    +1 Overall it looks good once the minor comments are addressed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1073: Refactor AbstractHdfsBolt

Posted by dossett <gi...@git.apache.org>.
Github user dossett commented on the pull request:

    https://github.com/apache/storm/pull/767#issuecomment-149578473
  
    @revans2 Thank you for feedback. I was the author of the previous code, so I have address those issues in this PR.  Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1073: Refactor AbstractHdfsBolt

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/767#discussion_r42546202
  
    --- Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java ---
    @@ -127,9 +145,112 @@ public void run() {
         }
     
         @Override
    +    public final void execute(Tuple tuple) {
    +
    +        synchronized (this.writeLock) {
    +            boolean forceSync = false;
    +            if (TupleUtils.isTick(tuple)) {
    +                LOG.debug("TICK! forcing a file system flush");
    +                forceSync = true;
    +            } else {
    +                try {
    +                    writeTuple(tuple);
    +                    tupleBatch.add(tuple);
    +                } catch (IOException e) {
    +                    //If the write failed, try to sync anything already written
    +                    LOG.info("Tuple failed to write, forcing a flush of existing data.");
    +                    this.collector.reportError(e);
    +                    forceSync = true;
    +                    this.collector.fail(tuple);
    +                }
    +            }
    +
    +            if (this.syncPolicy.mark(tuple, this.offset) || (forceSync && tupleBatch.size() > 0)) {
    +                int attempts = 0;
    +                boolean success = false;
    +                IOException lastException = null;
    +                // Make every attempt to sync the data we have.  If it can't be done then kill the bolt with
    +                // a runtime exception.  The filesystem is presumably in a very bad state.
    +                while (success == false && attempts < fileRetryCount) {
    +                    attempts += 1;
    +                    try {
    +                        syncTuples();
    +                        LOG.debug("Data synced to filesystem. Ack'ing [{}] tuples", tupleBatch.size());
    +                        for (Tuple t : tupleBatch) {
    +                            this.collector.ack(t);
    +                        }
    +                        tupleBatch.clear();
    +                        syncPolicy.reset();
    +                        success = true;
    +                    } catch (IOException e) {
    +                        LOG.warn("Data could not be synced to filesystem on attempt [" + attempts + "]");
    +                        this.collector.reportError(e);
    +                        lastException = e;
    +                    }
    +                }
    +
    +                // If unsuccesful fail the pending tuples
    +                if (success == false) {
    +                    LOG.warn("Data could not be synced to filesystem, failing this batch of tuples");
    +                    for (Tuple t : tupleBatch) {
    +                        this.collector.fail(t);
    +                    }
    +                    tupleBatch.clear();
    +
    +                    throw new RuntimeException("Sync failed [" + attempts + "] times.", lastException);
    +                }
    +            }
    +
    +            if(this.rotationPolicy.mark(tuple, this.offset)) {
    +                try {
    +                    rotateOutputFile();
    +                    this.rotationPolicy.reset();
    +                    this.offset = 0;
    +                } catch (IOException e) {
    +                    this.collector.reportError(e);
    +                    LOG.warn("File could not be rotated");
    +                    //At this point there is nothing to do.  In all likelihood any filesystem operations will fail.
    +                    //The next tuple will almost certainly fail to write and/or sync, which force a rotation.  That
    +                    //will give rotateAndReset() a chance to work which includes creating a fresh file handle.
    +                }
    +            }
    +        }
    +    }
    +
    +    @Override
    +    public Map<String, Object> getComponentConfiguration() {
    +        Map<String, Object> conf = super.getComponentConfiguration();
    +        if (conf == null)
    +            conf = new Config();
    +
    +        if (tickTupleInterval > 0) {
    +            LOG.info("Enabling tick tuple with interval [" + tickTupleInterval + "]");
    --- End diff --
    
    Curly brace anchor for logging as above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1073: Refactor AbstractHdfsBolt

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the pull request:

    https://github.com/apache/storm/pull/767#issuecomment-149588958
  
    The code looks fine I am +1 on this, but I would like to give @harshach and @arunmahadevan some time if they want to comment. Any feedback?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1073: Refactor AbstractHdfsBolt

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/767#discussion_r42497617
  
    --- Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java ---
    @@ -127,9 +144,110 @@ public void run() {
         }
     
         @Override
    +    public final void execute(Tuple tuple) {
    +
    +        synchronized (this.writeLock) {
    +            boolean forceSync = false;
    +            if (TupleUtils.isTick(tuple)) {
    +                LOG.debug("TICK! forcing a file system flush");
    +                forceSync = true;
    +            } else {
    +                try {
    +                    writeTuple(tuple);
    +                    tupleBatch.add(tuple);
    +                } catch (IOException e) {
    +                    //If the write failed, try to sync anything already written
    +                    LOG.info("Tuple failed to write, forcing a flush of existing data.");
    +                    this.collector.reportError(e);
    +                    forceSync = true;
    +                    this.collector.fail(tuple);
    +                }
    +            }
    +
    +            if (this.syncPolicy.mark(tuple, this.offset) || (forceSync && tupleBatch.size() > 0)) {
    +                int attempts = 0;
    +                boolean success = false;
    +                IOException lastException = null;
    +                // Make every attempt to sync the data we have.  If it can't be done then kill the bolt with
    +                // a runtime exception.  The filesystem is presumably in a very bad state.
    +                while (success == false && attempts < fileRetryCount) {
    +                    attempts += 1;
    +                    try {
    +                        syncTuples();
    +                        LOG.debug("Data synced to filesystem. Ack'ing [" + tupleBatch.size() + "] tuples");
    +                        for (Tuple t : tupleBatch)
    --- End diff --
    
    We prefer to have all bodies of for loops in curly braces '{}'


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1073: Refactor AbstractHdfsBolt

Posted by dossett <gi...@git.apache.org>.
Github user dossett commented on the pull request:

    https://github.com/apache/storm/pull/767#issuecomment-149691038
  
    Thanks for catching those @arunmahadevan. Fixed those and squashed everything into one commit.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1073: Refactor AbstractHdfsBolt

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on the pull request:

    https://github.com/apache/storm/pull/767#issuecomment-150937269
  
    +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1073: Refactor AbstractHdfsBolt

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the pull request:

    https://github.com/apache/storm/pull/767#issuecomment-149576539
  
    @dossett the code looks fine.  I have two minor nits, but they both existed in the previous code prior to the re-factor so I am +1 either way. 
    
    The build failure looks like it was a network hiccup with the maven repository.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1073: Refactor AbstractHdfsBolt

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on the pull request:

    https://github.com/apache/storm/pull/767#issuecomment-145260302
  
    @arunmahadevan please review this


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---