You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by ghajos <gi...@git.apache.org> on 2018/04/12 12:48:09 UTC

[GitHub] storm pull request #2633: STORM-3028 HdfsSpout: handle empty file in case of...

GitHub user ghajos opened a pull request:

    https://github.com/apache/storm/pull/2633

    STORM-3028 HdfsSpout: handle empty file in case of ackers

    There is a problem with all the storm-hdfs junit tests. The test results may not be valid.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/ghajos/storm STORM-3028

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/2633.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2633
    
----
commit 285e287c063c6a12c79553c7a96c9878b0ba3884
Author: Gergely Hajos <ro...@...>
Date:   2018-04-12T11:58:13Z

    STORM-3028 HdfsSpout: handle empty file in case of ackers

----


---

[GitHub] storm pull request #2633: STORM-3028 HdfsSpout: handle empty file in case of...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2633#discussion_r184707886
  
    --- Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java ---
    @@ -101,6 +101,7 @@
         private final AtomicBoolean commitTimeElapsed = new AtomicBoolean(false);
         private Timer commitTimer;
         private boolean fileReadCompletely = true;
    +    private boolean newReader = false;
    --- End diff --
    
    Does this need to be a field? It looks to me like it would work fine as a local too?


---

[GitHub] storm issue #2633: STORM-3028 HdfsSpout: handle empty file in case of ackers

Posted by ghajos <gi...@git.apache.org>.
Github user ghajos commented on the issue:

    https://github.com/apache/storm/pull/2633
  
    @srdo Can you please take a look at?


---

[GitHub] storm pull request #2633: STORM-3028 HdfsSpout: handle empty file in case of...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2633#discussion_r188594538
  
    --- Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java ---
    @@ -221,14 +221,18 @@ public void nextTuple() {
             while (true) {
                 try {
                     // 3) Select a new file if one is not open already
    +                boolean newReader = false;
                     if (reader == null) {
                         reader = pickNextFile();
                         if (reader == null) {
                             LOG.debug("Currently no new files to process under : " + sourceDirPath);
                             return;
                         } else {
                             fileReadCompletely = false;
    +                        newReader = true;
                         }
    +                } else {
    +                    newReader = false;
    --- End diff --
    
    Nit: This branch is a no op, newReader is already false


---

[GitHub] storm issue #2633: STORM-3028 HdfsSpout: handle empty file in case of ackers

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the issue:

    https://github.com/apache/storm/pull/2633
  
    @ghajos any plans to address the question from @srdo 


---

[GitHub] storm issue #2633: STORM-3028 HdfsSpout: handle empty file in case of ackers

Posted by ghajos <gi...@git.apache.org>.
Github user ghajos commented on the issue:

    https://github.com/apache/storm/pull/2633
  
    @revans2 @srdo I'm sorry! There is no reason to set both commit frequency count and commit frequency sec. I hope that this is already addressed in the last commit.


---

[GitHub] storm pull request #2633: STORM-3028 HdfsSpout: handle empty file in case of...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2633#discussion_r184710659
  
    --- Diff: external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java ---
    @@ -153,6 +153,35 @@ public void testSimpleText_ACK() throws Exception {
             }
         }
     
    +    @Test
    +    public void testEmptySimpleText_ACK() throws Exception {
    +        Path file1 = new Path(source.toString() + "/file_empty.txt");
    +        createTextFile(file1, 0);
    +
    +        Path file2 = new Path(source.toString() + "/file.txt");
    +        createTextFile(file2, 5);
    +
    +        try (AutoCloseableHdfsSpout closeableSpout = makeSpout(Configs.TEXT, TextFileReader.defaultFields)) {
    +            HdfsSpout spout = closeableSpout.spout;
    +            spout.setCommitFrequencyCount(1);
    --- End diff --
    
    Is there any reason to set both commit frequency count and commit frequency sec? 


---

[GitHub] storm pull request #2633: STORM-3028 HdfsSpout: handle empty file in case of...

Posted by srdo <gi...@git.apache.org>.
Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2633#discussion_r184711553
  
    --- Diff: external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java ---
    @@ -153,6 +153,35 @@ public void testSimpleText_ACK() throws Exception {
             }
         }
     
    +    @Test
    +    public void testEmptySimpleText_ACK() throws Exception {
    +        Path file1 = new Path(source.toString() + "/file_empty.txt");
    +        createTextFile(file1, 0);
    +
    +        Path file2 = new Path(source.toString() + "/file.txt");
    +        createTextFile(file2, 5);
    +
    +        try (AutoCloseableHdfsSpout closeableSpout = makeSpout(Configs.TEXT, TextFileReader.defaultFields)) {
    +            HdfsSpout spout = closeableSpout.spout;
    +            spout.setCommitFrequencyCount(1);
    +            spout.setCommitFrequencySec(1);
    +
    +            Map<String, Object> conf = getCommonConfigs();
    +            conf.put(Config.TOPOLOGY_ACKER_EXECUTORS, "1"); // enable ACKing
    +            openSpout(spout, 0, conf);
    +
    +            // consume empty file
    +            runSpout(spout, "r6");
    --- End diff --
    
    Nit: Isn't it enough to call nextTuple once rather than 6 times here?


---

[GitHub] storm pull request #2633: STORM-3028 HdfsSpout: handle empty file in case of...

Posted by ghajos <gi...@git.apache.org>.
Github user ghajos commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2633#discussion_r188606821
  
    --- Diff: external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java ---
    @@ -221,14 +221,18 @@ public void nextTuple() {
             while (true) {
                 try {
                     // 3) Select a new file if one is not open already
    +                boolean newReader = false;
                     if (reader == null) {
                         reader = pickNextFile();
                         if (reader == null) {
                             LOG.debug("Currently no new files to process under : " + sourceDirPath);
                             return;
                         } else {
                             fileReadCompletely = false;
    +                        newReader = true;
                         }
    +                } else {
    +                    newReader = false;
    --- End diff --
    
    Thank you for pointing this out!


---

[GitHub] storm pull request #2633: STORM-3028 HdfsSpout: handle empty file in case of...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/storm/pull/2633


---