You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2017/09/11 15:42:02 UTC

[1/2] storm git commit: [STORM-2732] Close the eldest writer before removing it from WritersMap

Repository: storm
Updated Branches:
  refs/heads/master a42b10a91 -> 6d23b8b7e


[STORM-2732] Close the eldest writer before removing it from WritersMap


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/1275f80f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/1275f80f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/1275f80f

Branch: refs/heads/master
Commit: 1275f80fc844e73ef4de9e028bf0fcf0f06bcded
Parents: ea0e465
Author: Ethan Li <et...@gmail.com>
Authored: Fri Sep 8 14:17:50 2017 -0500
Committer: Ethan Li <et...@gmail.com>
Committed: Mon Sep 11 10:14:41 2017 -0500

----------------------------------------------------------------------
 .../storm/hdfs/bolt/AbstractHdfsBolt.java       | 25 +++++++++--
 .../apache/storm/hdfs/bolt/TestWritersMap.java  | 46 +++++++++++++++++---
 2 files changed, 62 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/1275f80f/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
index f9744ea..be39a53 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
@@ -106,7 +106,7 @@ public abstract class AbstractHdfsBolt extends BaseRichBolt {
             throw new IllegalStateException("File system URL must be specified.");
         }
 
-        writers = new WritersMap(this.maxOpenFiles);
+        writers = new WritersMap(this.maxOpenFiles, collector);
 
         this.collector = collector;
         this.fileNameFormat.prepare(conf, topologyContext);
@@ -196,7 +196,7 @@ public abstract class AbstractHdfsBolt extends BaseRichBolt {
             }
 
             if (writer != null && writer.needsRotation()) {
-                    doRotationAndRemoveWriter(writerKey, writer);
+                doRotationAndRemoveWriter(writerKey, writer);
             }
         }
     }
@@ -238,6 +238,7 @@ public abstract class AbstractHdfsBolt extends BaseRichBolt {
             //The next tuple will almost certainly fail to write and/or sync, which force a rotation.  That
             //will give rotateAndReset() a chance to work which includes creating a fresh file handle.
         } finally {
+            //rotateOutputFile(writer) has closed the writer. It's safe to remove the writer from the map here.
             writers.remove(writerKey);
         }
     }
@@ -261,9 +262,11 @@ public abstract class AbstractHdfsBolt extends BaseRichBolt {
             try {
                 rotateOutputFile(writer);
             } catch (IOException e) {
+                this.collector.reportError(e);
                 LOG.warn("IOException during scheduled file rotation.", e);
             }
         }
+        //above for-loop has closed all the writers. It's safe to clear the map here.
         writers.clear();
     }
 
@@ -309,15 +312,29 @@ public abstract class AbstractHdfsBolt extends BaseRichBolt {
 
     static class WritersMap extends LinkedHashMap<String, Writer> {
         final long maxWriters;
+        final OutputCollector collector;
 
-        public WritersMap(long maxWriters) {
+        public WritersMap(long maxWriters, OutputCollector collector) {
             super((int)maxWriters, 0.75f, true);
             this.maxWriters = maxWriters;
+            this.collector = collector;
         }
 
         @Override
         protected boolean removeEldestEntry(Map.Entry<String, Writer> eldest) {
-            return this.size() > this.maxWriters;
+            if (this.size() > this.maxWriters) {
+                //The writer must be closed before removed from the map.
+                //If it failed, we might lose some data.
+                try {
+                    eldest.getValue().close();
+                } catch (IOException e) {
+                    collector.reportError(e);
+                    LOG.error("Failed to close the eldest Writer");
+                }
+                return true;
+            } else {
+                return false;
+            }
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/1275f80f/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestWritersMap.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestWritersMap.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestWritersMap.java
index fd99efe..9554aba 100644
--- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestWritersMap.java
+++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestWritersMap.java
@@ -15,19 +15,24 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.storm.hdfs.bolt;
 
+import java.io.IOException;
+import org.apache.hadoop.fs.Path;
+import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
+import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy;
 import org.apache.storm.hdfs.common.AbstractHDFSWriter;
+import org.apache.storm.tuple.Tuple;
 import org.junit.Assert;
 import org.junit.Test;
-import org.mockito.Mock;
 
 public class TestWritersMap {
 
-    AbstractHdfsBolt.WritersMap map = new AbstractHdfsBolt.WritersMap(2);
-    @Mock AbstractHDFSWriter foo;
-    @Mock AbstractHDFSWriter bar;
-    @Mock AbstractHDFSWriter baz;
+    AbstractHdfsBolt.WritersMap map = new AbstractHdfsBolt.WritersMap(2, null);
+    AbstractHDFSWriterMock foo = new AbstractHDFSWriterMock(new FileSizeRotationPolicy(1, FileSizeRotationPolicy.Units.KB), null);
+    AbstractHDFSWriterMock bar = new AbstractHDFSWriterMock(new FileSizeRotationPolicy(1, FileSizeRotationPolicy.Units.KB), null);
+    AbstractHDFSWriterMock baz = new AbstractHDFSWriterMock(new FileSizeRotationPolicy(1, FileSizeRotationPolicy.Units.KB), null);
 
     @Test public void testLRUBehavior()
     {
@@ -44,5 +49,36 @@ public class TestWritersMap {
         Assert.assertTrue(map.keySet().contains("BAZ"));
 
         Assert.assertFalse(map.keySet().contains("BAR"));
+
+        // The removed writer should have been closed
+        Assert.assertTrue(bar.isClosed);
+
+        Assert.assertFalse(foo.isClosed);
+        Assert.assertFalse(baz.isClosed);
+    }
+
+    public static final class AbstractHDFSWriterMock extends AbstractHDFSWriter {
+        Boolean isClosed;
+
+        public AbstractHDFSWriterMock(FileRotationPolicy policy, Path path) {
+            super(policy, path);
+            isClosed = false;
+        }
+
+        @Override
+        protected void doWrite(Tuple tuple) throws IOException {
+
+        }
+
+        @Override
+        protected void doSync() throws IOException {
+
+        }
+
+        @Override
+        protected void doClose() throws IOException {
+            isClosed = true;
+        }
     }
+
 }


[2/2] storm git commit: Merge branch 'STORM-2732' of https://github.com/Ethanlm/storm into STORM-2732

Posted by bo...@apache.org.
Merge branch 'STORM-2732' of https://github.com/Ethanlm/storm into STORM-2732

STORM-2732: Close the eldest writer before removing it from WritersMap

This closes #2315


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/6d23b8b7
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/6d23b8b7
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/6d23b8b7

Branch: refs/heads/master
Commit: 6d23b8b7e1ff3ef15f60449d7089af452009c220
Parents: a42b10a 1275f80
Author: Robert Evans <ev...@yahoo-inc.com>
Authored: Mon Sep 11 10:25:43 2017 -0500
Committer: Robert Evans <ev...@yahoo-inc.com>
Committed: Mon Sep 11 10:25:43 2017 -0500

----------------------------------------------------------------------
 .../storm/hdfs/bolt/AbstractHdfsBolt.java       | 25 +++++++++--
 .../apache/storm/hdfs/bolt/TestWritersMap.java  | 46 +++++++++++++++++---
 2 files changed, 62 insertions(+), 9 deletions(-)
----------------------------------------------------------------------