You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2018/07/09 21:31:15 UTC
[1/4] storm git commit: STORM-3028 HdfsSpout: handle empty file in
case of ackers
Repository: storm
Updated Branches:
refs/heads/master daec24841 -> da9cb5490
STORM-3028 HdfsSpout: handle empty file in case of ackers
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/285e287c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/285e287c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/285e287c
Branch: refs/heads/master
Commit: 285e287c063c6a12c79553c7a96c9878b0ba3884
Parents: 4f5fdd9
Author: Gergely Hajos <ro...@gmail.com>
Authored: Thu Apr 12 13:58:13 2018 +0200
Committer: Gergely Hajos <ro...@gmail.com>
Committed: Thu Apr 12 14:43:43 2018 +0200
----------------------------------------------------------------------
.../org/apache/storm/hdfs/spout/HdfsSpout.java | 7 ++++-
.../apache/storm/hdfs/spout/TestHdfsSpout.java | 29 ++++++++++++++++++++
2 files changed, 35 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/285e287c/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
index 6d626be..d3861df 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
@@ -101,6 +101,7 @@ public class HdfsSpout extends BaseRichSpout {
private final AtomicBoolean commitTimeElapsed = new AtomicBoolean(false);
private Timer commitTimer;
private boolean fileReadCompletely = true;
+ private boolean newReader = false;
private String configKey = Configs.DEFAULT_HDFS_CONFIG_KEY; // key for hdfs Kerberos configs
@@ -228,7 +229,10 @@ public class HdfsSpout extends BaseRichSpout {
return;
} else {
fileReadCompletely = false;
+ newReader = true;
}
+ } else {
+ newReader = false;
}
if (fileReadCompletely) { // wait for more ACKs before proceeding
return;
@@ -250,7 +254,8 @@ public class HdfsSpout extends BaseRichSpout {
return;
} else {
fileReadCompletely = true;
- if (!ackEnabled) {
+ // if newReader is true and tuple is null then it is an empty reader
+ if (!ackEnabled || newReader) {
markFileAsDone(reader.getFilePath());
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/285e287c/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
index 67ccde6..fbd87b8 100644
--- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
+++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
@@ -154,6 +154,35 @@ public class TestHdfsSpout {
}
@Test
+ public void testEmptySimpleText_ACK() throws Exception {
+ Path file1 = new Path(source.toString() + "/file_empty.txt");
+ createTextFile(file1, 0);
+
+ Path file2 = new Path(source.toString() + "/file.txt");
+ createTextFile(file2, 5);
+
+ try (AutoCloseableHdfsSpout closeableSpout = makeSpout(Configs.TEXT, TextFileReader.defaultFields)) {
+ HdfsSpout spout = closeableSpout.spout;
+ spout.setCommitFrequencyCount(1);
+ spout.setCommitFrequencySec(1);
+
+ Map<String, Object> conf = getCommonConfigs();
+ conf.put(Config.TOPOLOGY_ACKER_EXECUTORS, "1"); // enable ACKing
+ openSpout(spout, 0, conf);
+
+ // consume empty file
+ runSpout(spout, "r6");
+ Path arc1 = new Path(archive.toString() + "/file_empty.txt");
+ checkCollectorOutput_txt((MockCollector) spout.getCollector(), arc1);
+
+ // consume file 2
+ runSpout(spout, "r5", "a0", "a1", "a2", "a3", "a4");
+ Path arc2 = new Path(archive.toString() + "/file.txt");
+ checkCollectorOutput_txt((MockCollector) spout.getCollector(), arc1, arc2);
+ }
+ }
+
+ @Test
public void testResumeAbandoned_Text_NoAck() throws Exception {
Path file1 = new Path(source.toString() + "/file1.txt");
createTextFile(file1, 6);
[3/4] storm git commit: STORM-3028 HdfsSpout: handle empty file in
case of ackers
Posted by ka...@apache.org.
STORM-3028 HdfsSpout: handle empty file in case of ackers
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/473bf84a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/473bf84a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/473bf84a
Branch: refs/heads/master
Commit: 473bf84a0caef26544b8c01aecd6fb139aa51110
Parents: 3382482
Author: Gergely Hajos <ro...@gmail.com>
Authored: Wed May 16 14:18:56 2018 +0200
Committer: Gergely Hajos <ro...@gmail.com>
Committed: Wed May 16 14:18:56 2018 +0200
----------------------------------------------------------------------
.../src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java | 2 --
1 file changed, 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/473bf84a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
index e34b194..31fabc8 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
@@ -231,8 +231,6 @@ public class HdfsSpout extends BaseRichSpout {
fileReadCompletely = false;
newReader = true;
}
- } else {
- newReader = false;
}
if (fileReadCompletely) { // wait for more ACKs before proceeding
return;
[4/4] storm git commit: Merge branch 'STORM-3028' of
https://github.com/ghajos/storm into STORM-3028-merge
Posted by ka...@apache.org.
Merge branch 'STORM-3028' of https://github.com/ghajos/storm into STORM-3028-merge
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/da9cb549
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/da9cb549
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/da9cb549
Branch: refs/heads/master
Commit: da9cb54904bc196dd689c265ce8373f8b3256e8d
Parents: daec248 473bf84
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Tue Jul 10 06:24:13 2018 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Tue Jul 10 06:24:13 2018 +0900
----------------------------------------------------------------------
.../org/apache/storm/hdfs/spout/HdfsSpout.java | 5 +++-
.../apache/storm/hdfs/spout/TestHdfsSpout.java | 28 ++++++++++++++++++++
2 files changed, 32 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/da9cb549/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/da9cb549/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
----------------------------------------------------------------------
[2/4] storm git commit: STORM-3028 HdfsSpout: handle empty file in
case of ackers
Posted by ka...@apache.org.
STORM-3028 HdfsSpout: handle empty file in case of ackers
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/33824825
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/33824825
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/33824825
Branch: refs/heads/master
Commit: 33824825850aeeabcd2e14202fe096867481b02e
Parents: 285e287
Author: Gergely Hajos <ro...@gmail.com>
Authored: Fri May 4 13:05:18 2018 +0200
Committer: Gergely Hajos <ro...@gmail.com>
Committed: Fri May 4 13:05:18 2018 +0200
----------------------------------------------------------------------
.../src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java | 2 +-
.../src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java | 3 +--
2 files changed, 2 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/33824825/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
index d3861df..e34b194 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java
@@ -101,7 +101,6 @@ public class HdfsSpout extends BaseRichSpout {
private final AtomicBoolean commitTimeElapsed = new AtomicBoolean(false);
private Timer commitTimer;
private boolean fileReadCompletely = true;
- private boolean newReader = false;
private String configKey = Configs.DEFAULT_HDFS_CONFIG_KEY; // key for hdfs Kerberos configs
@@ -222,6 +221,7 @@ public class HdfsSpout extends BaseRichSpout {
while (true) {
try {
// 3) Select a new file if one is not open already
+ boolean newReader = false;
if (reader == null) {
reader = pickNextFile();
if (reader == null) {
http://git-wip-us.apache.org/repos/asf/storm/blob/33824825/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
index fbd87b8..e74bea2 100644
--- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
+++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
@@ -164,14 +164,13 @@ public class TestHdfsSpout {
try (AutoCloseableHdfsSpout closeableSpout = makeSpout(Configs.TEXT, TextFileReader.defaultFields)) {
HdfsSpout spout = closeableSpout.spout;
spout.setCommitFrequencyCount(1);
- spout.setCommitFrequencySec(1);
Map<String, Object> conf = getCommonConfigs();
conf.put(Config.TOPOLOGY_ACKER_EXECUTORS, "1"); // enable ACKing
openSpout(spout, 0, conf);
// consume empty file
- runSpout(spout, "r6");
+ runSpout(spout, "r1");
Path arc1 = new Path(archive.toString() + "/file_empty.txt");
checkCollectorOutput_txt((MockCollector) spout.getCollector(), arc1);