You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2018/07/09 21:31:15 UTC

[1/4] storm git commit: STORM-3028 HdfsSpout: handle empty file in case of ackers

Repository: storm
Updated Branches:
  refs/heads/master daec24841 -> da9cb5490


STORM-3028 HdfsSpout: handle empty file in case of ackers


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/285e287c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/285e287c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/285e287c

Branch: refs/heads/master
Commit: 285e287c063c6a12c79553c7a96c9878b0ba3884
Parents: 4f5fdd9
Author: Gergely Hajos <ro...@gmail.com>
Authored: Thu Apr 12 13:58:13 2018 +0200
Committer: Gergely Hajos <ro...@gmail.com>
Committed: Thu Apr 12 14:43:43 2018 +0200

----------------------------------------------------------------------
 .../org/apache/storm/hdfs/spout/HdfsSpout.java  |  7 ++++-
 .../apache/storm/hdfs/spout/TestHdfsSpout.java  | 29 ++++++++++++++++++++
 2 files changed, 35 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/285e287c/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
index 6d626be..d3861df 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
@@ -101,6 +101,7 @@ public class HdfsSpout extends BaseRichSpout {
     private final AtomicBoolean commitTimeElapsed = new AtomicBoolean(false);
     private Timer commitTimer;
     private boolean fileReadCompletely = true;
+    private boolean newReader = false;
 
     private String configKey = Configs.DEFAULT_HDFS_CONFIG_KEY; // key for hdfs Kerberos configs
 
@@ -228,7 +229,10 @@ public class HdfsSpout extends BaseRichSpout {
                         return;
                     } else {
                         fileReadCompletely = false;
+                        newReader = true;
                     }
+                } else {
+                    newReader = false;
                 }
                 if (fileReadCompletely) { // wait for more ACKs before proceeding
                     return;
@@ -250,7 +254,8 @@ public class HdfsSpout extends BaseRichSpout {
                     return;
                 } else {
                     fileReadCompletely = true;
-                    if (!ackEnabled) {
+                    // if newReader is true and tuple is null then it is an empty reader
+                    if (!ackEnabled || newReader) {
                         markFileAsDone(reader.getFilePath());
                     }
                 }

http://git-wip-us.apache.org/repos/asf/storm/blob/285e287c/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
index 67ccde6..fbd87b8 100644
--- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
+++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
@@ -154,6 +154,35 @@ public class TestHdfsSpout {
     }
 
     @Test
+    public void testEmptySimpleText_ACK() throws Exception {
+        Path file1 = new Path(source.toString() + "/file_empty.txt");
+        createTextFile(file1, 0);
+
+        Path file2 = new Path(source.toString() + "/file.txt");
+        createTextFile(file2, 5);
+
+        try (AutoCloseableHdfsSpout closeableSpout = makeSpout(Configs.TEXT, TextFileReader.defaultFields)) {
+            HdfsSpout spout = closeableSpout.spout;
+            spout.setCommitFrequencyCount(1);
+            spout.setCommitFrequencySec(1);
+
+            Map<String, Object> conf = getCommonConfigs();
+            conf.put(Config.TOPOLOGY_ACKER_EXECUTORS, "1"); // enable ACKing
+            openSpout(spout, 0, conf);
+
+            // consume empty file
+            runSpout(spout, "r6");
+            Path arc1 = new Path(archive.toString() + "/file_empty.txt");
+            checkCollectorOutput_txt((MockCollector) spout.getCollector(), arc1);
+
+            // consume file 2
+            runSpout(spout, "r5", "a0", "a1", "a2", "a3", "a4");
+            Path arc2 = new Path(archive.toString() + "/file.txt");
+            checkCollectorOutput_txt((MockCollector) spout.getCollector(), arc1, arc2);
+        }
+    }
+
+    @Test
     public void testResumeAbandoned_Text_NoAck() throws Exception {
         Path file1 = new Path(source.toString() + "/file1.txt");
         createTextFile(file1, 6);


[3/4] storm git commit: STORM-3028 HdfsSpout: handle empty file in case of ackers

Posted by ka...@apache.org.
STORM-3028 HdfsSpout: handle empty file in case of ackers


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/473bf84a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/473bf84a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/473bf84a

Branch: refs/heads/master
Commit: 473bf84a0caef26544b8c01aecd6fb139aa51110
Parents: 3382482
Author: Gergely Hajos <ro...@gmail.com>
Authored: Wed May 16 14:18:56 2018 +0200
Committer: Gergely Hajos <ro...@gmail.com>
Committed: Wed May 16 14:18:56 2018 +0200

----------------------------------------------------------------------
 .../src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java       | 2 --
 1 file changed, 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/473bf84a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
index e34b194..31fabc8 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
@@ -231,8 +231,6 @@ public class HdfsSpout extends BaseRichSpout {
                         fileReadCompletely = false;
                         newReader = true;
                     }
-                } else {
-                    newReader = false;
                 }
                 if (fileReadCompletely) { // wait for more ACKs before proceeding
                     return;


[4/4] storm git commit: Merge branch 'STORM-3028' of https://github.com/ghajos/storm into STORM-3028-merge

Posted by ka...@apache.org.
Merge branch 'STORM-3028' of https://github.com/ghajos/storm into STORM-3028-merge


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/da9cb549
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/da9cb549
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/da9cb549

Branch: refs/heads/master
Commit: da9cb54904bc196dd689c265ce8373f8b3256e8d
Parents: daec248 473bf84
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Tue Jul 10 06:24:13 2018 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Tue Jul 10 06:24:13 2018 +0900

----------------------------------------------------------------------
 .../org/apache/storm/hdfs/spout/HdfsSpout.java  |  5 +++-
 .../apache/storm/hdfs/spout/TestHdfsSpout.java  | 28 ++++++++++++++++++++
 2 files changed, 32 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/da9cb549/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/da9cb549/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
----------------------------------------------------------------------


[2/4] storm git commit: STORM-3028 HdfsSpout: handle empty file in case of ackers

Posted by ka...@apache.org.
STORM-3028 HdfsSpout: handle empty file in case of ackers


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/33824825
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/33824825
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/33824825

Branch: refs/heads/master
Commit: 33824825850aeeabcd2e14202fe096867481b02e
Parents: 285e287
Author: Gergely Hajos <ro...@gmail.com>
Authored: Fri May 4 13:05:18 2018 +0200
Committer: Gergely Hajos <ro...@gmail.com>
Committed: Fri May 4 13:05:18 2018 +0200

----------------------------------------------------------------------
 .../src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java      | 2 +-
 .../src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java  | 3 +--
 2 files changed, 2 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/33824825/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
index d3861df..e34b194 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
@@ -101,7 +101,6 @@ public class HdfsSpout extends BaseRichSpout {
     private final AtomicBoolean commitTimeElapsed = new AtomicBoolean(false);
     private Timer commitTimer;
     private boolean fileReadCompletely = true;
-    private boolean newReader = false;
 
     private String configKey = Configs.DEFAULT_HDFS_CONFIG_KEY; // key for hdfs Kerberos configs
 
@@ -222,6 +221,7 @@ public class HdfsSpout extends BaseRichSpout {
         while (true) {
             try {
                 // 3) Select a new file if one is not open already
+                boolean newReader = false;
                 if (reader == null) {
                     reader = pickNextFile();
                     if (reader == null) {

http://git-wip-us.apache.org/repos/asf/storm/blob/33824825/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
index fbd87b8..e74bea2 100644
--- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
+++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
@@ -164,14 +164,13 @@ public class TestHdfsSpout {
         try (AutoCloseableHdfsSpout closeableSpout = makeSpout(Configs.TEXT, TextFileReader.defaultFields)) {
             HdfsSpout spout = closeableSpout.spout;
             spout.setCommitFrequencyCount(1);
-            spout.setCommitFrequencySec(1);
 
             Map<String, Object> conf = getCommonConfigs();
             conf.put(Config.TOPOLOGY_ACKER_EXECUTORS, "1"); // enable ACKing
             openSpout(spout, 0, conf);
 
             // consume empty file
-            runSpout(spout, "r6");
+            runSpout(spout, "r1");
             Path arc1 = new Path(archive.toString() + "/file_empty.txt");
             checkCollectorOutput_txt((MockCollector) spout.getCollector(), arc1);