You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2017/09/11 15:42:02 UTC
[1/2] storm git commit: [STORM-2732] Close the eldest writer before
removing it from WritersMap
Repository: storm
Updated Branches:
refs/heads/master a42b10a91 -> 6d23b8b7e
[STORM-2732] Close the eldest writer before removing it from WritersMap
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/1275f80f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/1275f80f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/1275f80f
Branch: refs/heads/master
Commit: 1275f80fc844e73ef4de9e028bf0fcf0f06bcded
Parents: ea0e465
Author: Ethan Li <et...@gmail.com>
Authored: Fri Sep 8 14:17:50 2017 -0500
Committer: Ethan Li <et...@gmail.com>
Committed: Mon Sep 11 10:14:41 2017 -0500
----------------------------------------------------------------------
.../storm/hdfs/bolt/AbstractHdfsBolt.java | 25 +++++++++--
.../apache/storm/hdfs/bolt/TestWritersMap.java | 46 +++++++++++++++++---
2 files changed, 62 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/1275f80f/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
index f9744ea..be39a53 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
@@ -106,7 +106,7 @@ public abstract class AbstractHdfsBolt extends BaseRichBolt {
throw new IllegalStateException("File system URL must be specified.");
}
- writers = new WritersMap(this.maxOpenFiles);
+ writers = new WritersMap(this.maxOpenFiles, collector);
this.collector = collector;
this.fileNameFormat.prepare(conf, topologyContext);
@@ -196,7 +196,7 @@ public abstract class AbstractHdfsBolt extends BaseRichBolt {
}
if (writer != null && writer.needsRotation()) {
- doRotationAndRemoveWriter(writerKey, writer);
+ doRotationAndRemoveWriter(writerKey, writer);
}
}
}
@@ -238,6 +238,7 @@ public abstract class AbstractHdfsBolt extends BaseRichBolt {
//The next tuple will almost certainly fail to write and/or sync, which force a rotation. That
//will give rotateAndReset() a chance to work which includes creating a fresh file handle.
} finally {
+ //rotateOutputFile(writer) has closed the writer. It's safe to remove the writer from the map here.
writers.remove(writerKey);
}
}
@@ -261,9 +262,11 @@ public abstract class AbstractHdfsBolt extends BaseRichBolt {
try {
rotateOutputFile(writer);
} catch (IOException e) {
+ this.collector.reportError(e);
LOG.warn("IOException during scheduled file rotation.", e);
}
}
+ //above for-loop has closed all the writers. It's safe to clear the map here.
writers.clear();
}
@@ -309,15 +312,29 @@ public abstract class AbstractHdfsBolt extends BaseRichBolt {
static class WritersMap extends LinkedHashMap<String, Writer> {
final long maxWriters;
+ final OutputCollector collector;
- public WritersMap(long maxWriters) {
+ public WritersMap(long maxWriters, OutputCollector collector) {
super((int)maxWriters, 0.75f, true);
this.maxWriters = maxWriters;
+ this.collector = collector;
}
@Override
protected boolean removeEldestEntry(Map.Entry<String, Writer> eldest) {
- return this.size() > this.maxWriters;
+ if (this.size() > this.maxWriters) {
+ //The writer must be closed before removed from the map.
+ //If it failed, we might lose some data.
+ try {
+ eldest.getValue().close();
+ } catch (IOException e) {
+ collector.reportError(e);
+ LOG.error("Failed to close the eldest Writer");
+ }
+ return true;
+ } else {
+ return false;
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/1275f80f/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestWritersMap.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestWritersMap.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestWritersMap.java
index fd99efe..9554aba 100644
--- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestWritersMap.java
+++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestWritersMap.java
@@ -15,19 +15,24 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.storm.hdfs.bolt;
+import java.io.IOException;
+import org.apache.hadoop.fs.Path;
+import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
+import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy;
import org.apache.storm.hdfs.common.AbstractHDFSWriter;
+import org.apache.storm.tuple.Tuple;
import org.junit.Assert;
import org.junit.Test;
-import org.mockito.Mock;
public class TestWritersMap {
- AbstractHdfsBolt.WritersMap map = new AbstractHdfsBolt.WritersMap(2);
- @Mock AbstractHDFSWriter foo;
- @Mock AbstractHDFSWriter bar;
- @Mock AbstractHDFSWriter baz;
+ AbstractHdfsBolt.WritersMap map = new AbstractHdfsBolt.WritersMap(2, null);
+ AbstractHDFSWriterMock foo = new AbstractHDFSWriterMock(new FileSizeRotationPolicy(1, FileSizeRotationPolicy.Units.KB), null);
+ AbstractHDFSWriterMock bar = new AbstractHDFSWriterMock(new FileSizeRotationPolicy(1, FileSizeRotationPolicy.Units.KB), null);
+ AbstractHDFSWriterMock baz = new AbstractHDFSWriterMock(new FileSizeRotationPolicy(1, FileSizeRotationPolicy.Units.KB), null);
@Test public void testLRUBehavior()
{
@@ -44,5 +49,36 @@ public class TestWritersMap {
Assert.assertTrue(map.keySet().contains("BAZ"));
Assert.assertFalse(map.keySet().contains("BAR"));
+
+ // The removed writer should have been closed
+ Assert.assertTrue(bar.isClosed);
+
+ Assert.assertFalse(foo.isClosed);
+ Assert.assertFalse(baz.isClosed);
+ }
+
+ public static final class AbstractHDFSWriterMock extends AbstractHDFSWriter {
+ Boolean isClosed;
+
+ public AbstractHDFSWriterMock(FileRotationPolicy policy, Path path) {
+ super(policy, path);
+ isClosed = false;
+ }
+
+ @Override
+ protected void doWrite(Tuple tuple) throws IOException {
+
+ }
+
+ @Override
+ protected void doSync() throws IOException {
+
+ }
+
+ @Override
+ protected void doClose() throws IOException {
+ isClosed = true;
+ }
}
+
}
[2/2] storm git commit: Merge branch 'STORM-2732' of
https://github.com/Ethanlm/storm into STORM-2732
Posted by bo...@apache.org.
Merge branch 'STORM-2732' of https://github.com/Ethanlm/storm into STORM-2732
STORM-2732: Close the eldest writer before removing it from WritersMap
This closes #2315
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/6d23b8b7
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/6d23b8b7
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/6d23b8b7
Branch: refs/heads/master
Commit: 6d23b8b7e1ff3ef15f60449d7089af452009c220
Parents: a42b10a 1275f80
Author: Robert Evans <ev...@yahoo-inc.com>
Authored: Mon Sep 11 10:25:43 2017 -0500
Committer: Robert Evans <ev...@yahoo-inc.com>
Committed: Mon Sep 11 10:25:43 2017 -0500
----------------------------------------------------------------------
.../storm/hdfs/bolt/AbstractHdfsBolt.java | 25 +++++++++--
.../apache/storm/hdfs/bolt/TestWritersMap.java | 46 +++++++++++++++++---
2 files changed, 62 insertions(+), 9 deletions(-)
----------------------------------------------------------------------