You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2019/07/29 21:57:09 UTC

[storm] branch master updated: STORM-3372: Fix NPE when shutting down HdfsBolt, fix storm-hdfs tests not running

This is an automated email from the ASF dual-hosted git repository.

srdo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git


The following commit(s) were added to refs/heads/master by this push:
     new d1912ae  STORM-3372: Fix NPE when shutting down HdfsBolt, fix storm-hdfs tests not running
     new f015130  Merge pull request #2990 from srdo/STORM-3372
d1912ae is described below

commit d1912ae98afe9f470a05e57835c41f056cebb311
Author: Stig Rohde Døssing <sr...@apache.org>
AuthorDate: Thu Apr 4 15:58:55 2019 +0200

    STORM-3372: Fix NPE when shutting down HdfsBolt, fix storm-hdfs tests not running
---
 external/storm-hdfs-blobstore/pom.xml              |   5 +
 .../apache/storm/hdfs/blobstore/BlobStoreTest.java | 266 ++++++++++-----------
 external/storm-hdfs/pom.xml                        |   2 +-
 .../apache/storm/hdfs/bolt/AbstractHdfsBolt.java   |   4 +-
 .../org/apache/storm/hdfs/bolt/TestHdfsBolt.java   |  12 +
 .../hdfs/bolt/format/TestSimpleFileNameFormat.java |   2 +-
 .../apache/storm/hdfs/spout/TestHdfsSemantics.java |   5 +-
 .../org/apache/storm/hdfs/spout/TestHdfsSpout.java |  25 +-
 .../hdfs/testing/MiniDFSClusterExtension.java      |  64 +++++
 .../storm/hdfs/testing/MiniDFSClusterRule.java     |   5 +
 .../storm-hdfs/src/test/resources/log4j.properties |   5 +-
 external/storm-hdfs/src/test/resources/log4j2.xml  |  32 +++
 pom.xml                                            |   2 +-
 13 files changed, 268 insertions(+), 161 deletions(-)

diff --git a/external/storm-hdfs-blobstore/pom.xml b/external/storm-hdfs-blobstore/pom.xml
index e2b948b..ccc69a6 100644
--- a/external/storm-hdfs-blobstore/pom.xml
+++ b/external/storm-hdfs-blobstore/pom.xml
@@ -208,6 +208,11 @@
             <artifactId>guava</artifactId>
             <version>${guava.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter-params</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
     <build>
         <plugins>
diff --git a/external/storm-hdfs-blobstore/src/test/java/org/apache/storm/hdfs/blobstore/BlobStoreTest.java b/external/storm-hdfs-blobstore/src/test/java/org/apache/storm/hdfs/blobstore/BlobStoreTest.java
index a125793..53cca75 100644
--- a/external/storm-hdfs-blobstore/src/test/java/org/apache/storm/hdfs/blobstore/BlobStoreTest.java
+++ b/external/storm-hdfs-blobstore/src/test/java/org/apache/storm/hdfs/blobstore/BlobStoreTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.storm.hdfs.blobstore;
 
+import org.apache.storm.hdfs.testing.MiniDFSClusterExtension;
 import org.apache.commons.io.FileUtils;
 import org.apache.storm.Config;
 import org.apache.storm.blobstore.AtomicOutputStream;
@@ -28,14 +29,9 @@ import org.apache.storm.generated.AccessControlType;
 import org.apache.storm.generated.AuthorizationException;
 import org.apache.storm.generated.KeyNotFoundException;
 import org.apache.storm.generated.SettableBlobMeta;
-import org.apache.storm.hdfs.testing.MiniDFSClusterRule;
 import org.apache.storm.security.auth.FixedGroupsMapping;
 import org.apache.storm.security.auth.NimbusPrincipal;
 import org.apache.storm.security.auth.SingleUserPrincipal;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.ClassRule;
-import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -55,30 +51,34 @@ import java.util.UUID;
 
 import static org.junit.Assert.*;
 
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.ValueSource;
+
 public class BlobStoreTest {
 
-    @ClassRule
-    public static final MiniDFSClusterRule DFS_CLUSTER_RULE = new MiniDFSClusterRule();
+    @RegisterExtension
+    public static final MiniDFSClusterExtension DFS_CLUSTER_EXTENSION = new MiniDFSClusterExtension();
 
     private static final Logger LOG = LoggerFactory.getLogger(BlobStoreTest.class);
     URI base;
-    File baseFile;
     private static final Map<String, Object> CONF = new HashMap<>();
     public static final int READ = 0x01;
     public static final int WRITE = 0x02;
     public static final int ADMIN = 0x04;
 
-    @Before
+    @BeforeEach
     public void init() {
         initializeConfigs();
-        baseFile = new File("/tmp/blob-store-test-" + UUID.randomUUID());
-        base = baseFile.toURI();
     }
 
-    @After
+    @AfterEach
     public void cleanup()
         throws IOException {
-        FileUtils.deleteDirectory(baseFile);
     }
 
     // Method which initializes nimbus admin
@@ -160,7 +160,7 @@ public class BlobStoreTest {
         conf.put(Config.STORM_PRINCIPAL_TO_LOCAL_PLUGIN, "org.apache.storm.security.auth.DefaultPrincipalToLocal");
         conf.put(Config.STORM_BLOBSTORE_REPLICATION_FACTOR, 3);
         HdfsBlobStore store = new HdfsBlobStore();
-        store.prepareInternal(conf, null, DFS_CLUSTER_RULE.getDfscluster().getConfiguration(0));
+        store.prepareInternal(conf, null, DFS_CLUSTER_EXTENSION.getDfscluster().getConfiguration(0));
         return new AutoCloseableBlobStoreContainer(store);
     }
 
@@ -204,15 +204,6 @@ public class BlobStoreTest {
         }
     }
 
-    @Test
-    public void testHdfsWithAuth()
-        throws Exception {
-        // use different blobstore dir so it doesn't conflict with other tests
-        try (AutoCloseableBlobStoreContainer container = initHdfs("/storm/blobstore3")) {
-            testWithAuthentication(container.blobStore);
-        }
-    }
-
     // Test for replication.
     public void testReplication(String path, BlobStore store)
         throws Exception {
@@ -289,133 +280,130 @@ public class BlobStoreTest {
         store.deleteBlob("test", getSubject(createSubject));
     }
 
-    public Subject getSubject(String name) {
+    public static Subject getSubject(String name) {
         Subject subject = new Subject();
         SingleUserPrincipal user = new SingleUserPrincipal(name);
         subject.getPrincipals().add(user);
         return subject;
     }
-
-    // Check for Blobstore with authentication
-    public void testWithAuthentication(BlobStore store)
-        throws Exception {
-        //Test for Nimbus Admin
-        Subject admin = getSubject("admin");
-        assertStoreHasExactly(store);
-        SettableBlobMeta metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
-        try (AtomicOutputStream out = store.createBlob("test", metadata, admin)) {
-            assertStoreHasExactly(store, "test");
-            out.write(1);
+    
+    static enum AuthenticationTestSubject {
+        //Nimbus Admin
+        ADMIN(getSubject("admin")),
+        //Nimbus groups admin
+        ADMIN_GROUPS_USER(getSubject("adminGroupsUser")),
+        //Supervisor admin
+        SUPERVISOR(getSubject("supervisor")),
+        //Nimbus itself
+        NIMBUS(getNimbusSubject());
+        
+        private Subject subject;
+
+        private AuthenticationTestSubject(Subject subject) {
+            this.subject = subject;
         }
-        store.deleteBlob("test", admin);
-
-        //Test for Nimbus Groups Admin
-        Subject adminsGroupsUser = getSubject("adminsGroupsUser");
-        assertStoreHasExactly(store);
-        metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
-        try (AtomicOutputStream out = store.createBlob("test", metadata, adminsGroupsUser)) {
-            assertStoreHasExactly(store, "test");
-            out.write(1);
+    }
+    
+    @ParameterizedTest
+    @EnumSource(value = AuthenticationTestSubject.class)
+    void testWithAuthentication(AuthenticationTestSubject testSubject) throws Exception {
+        try (AutoCloseableBlobStoreContainer container = initHdfs("/storm/blobstore-auth-" + testSubject.name())) {
+            BlobStore store = container.blobStore;
+            assertStoreHasExactly(store);
+            SettableBlobMeta metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
+            try (AtomicOutputStream out = store.createBlob("test", metadata, testSubject.subject)) {
+                assertStoreHasExactly(store, "test");
+                out.write(1);
+            }
+            store.deleteBlob("test", testSubject.subject);
         }
-        store.deleteBlob("test", adminsGroupsUser);
-
-        //Test for Supervisor Admin
-        Subject supervisor = getSubject("supervisor");
-        assertStoreHasExactly(store);
-        metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
-        try (AtomicOutputStream out = store.createBlob("test", metadata, supervisor)) {
+    }
+    
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void testWithAuthenticationDummy(boolean securityEnabled) throws Exception {
+        try (AutoCloseableBlobStoreContainer container = initHdfs("/storm/blobstore-auth-dummy-sec-" + securityEnabled)) {
+            BlobStore store = container.blobStore;
+            Subject who = getSubject("test_subject");
+            assertStoreHasExactly(store);
+
+            // Tests for case when subject != null (security turned on) and
+            // acls for the blob are set to WORLD_EVERYTHING
+            SettableBlobMeta metadata = new SettableBlobMeta(securityEnabled ? BlobStoreAclHandler.DEFAULT : BlobStoreAclHandler.WORLD_EVERYTHING);
+            try (AtomicOutputStream out = store.createBlob("test", metadata, who)) {
+                out.write(1);
+            }
             assertStoreHasExactly(store, "test");
-            out.write(1);
+            if (securityEnabled) {
+                // Testing whether acls are set to WORLD_EVERYTHING. Here the acl should not contain WORLD_EVERYTHING because
+                // the subject is neither null nor empty. The ACL should however contain USER_EVERYTHING as user needs to have
+                // complete access to the blob
+                assertTrue("ACL contains WORLD_EVERYTHING", !metadata.toString().contains("AccessControl(type:OTHER, access:7)"));
+            } else {
+                // Testing whether acls are set to WORLD_EVERYTHING
+                assertTrue("ACL does not contain WORLD_EVERYTHING", metadata.toString().contains("AccessControl(type:OTHER, access:7)"));
+            }
+            
+            readAssertEqualsWithAuth(store, who, "test", 1);
+
+            LOG.info("Deleting test");
+            store.deleteBlob("test", who);
+            assertStoreHasExactly(store);
         }
-        store.deleteBlob("test", supervisor);
-
-        //Test for Nimbus itself as a user
-        Subject nimbus = getNimbusSubject();
-        assertStoreHasExactly(store);
-        metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
-        try (AtomicOutputStream out = store.createBlob("test", metadata, nimbus)) {
+    }
+    
+    @Test
+    void testWithAuthenticationUpdate() throws Exception {
+        try (AutoCloseableBlobStoreContainer container = initHdfs("/storm/blobstore-auth-update")) {
+            BlobStore store = container.blobStore;
+            Subject who = getSubject("test_subject");
+            assertStoreHasExactly(store);
+
+            SettableBlobMeta metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
+            try (AtomicOutputStream out = store.createBlob("test", metadata, who)) {
+                out.write(1);
+            }
             assertStoreHasExactly(store, "test");
-            out.write(1);
-        }
-        store.deleteBlob("test", nimbus);
-
-        // Test with a dummy test_subject for cases where subject !=null (security turned on)
-        Subject who = getSubject("test_subject");
-        assertStoreHasExactly(store);
-
-        // Tests for case when subject != null (security turned on) and
-        // acls for the blob are set to WORLD_EVERYTHING
-        metadata = new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING);
-        try (AtomicOutputStream out = store.createBlob("test", metadata, who)) {
-            out.write(1);
-        }
-        assertStoreHasExactly(store, "test");
-        // Testing whether acls are set to WORLD_EVERYTHING
-        assertTrue("ACL does not contain WORLD_EVERYTHING", metadata.toString().contains("AccessControl(type:OTHER, access:7)"));
-        readAssertEqualsWithAuth(store, who, "test", 1);
-
-        LOG.info("Deleting test");
-        store.deleteBlob("test", who);
-        assertStoreHasExactly(store);
-
-        // Tests for case when subject != null (security turned on) and
-        // acls are not set for the blob (DEFAULT)
-        LOG.info("Creating test again");
-        metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
-        try (AtomicOutputStream out = store.createBlob("test", metadata, who)) {
-            out.write(2);
-        }
-        assertStoreHasExactly(store, "test");
-        // Testing whether acls are set to WORLD_EVERYTHING. Here the acl should not contain WORLD_EVERYTHING because
-        // the subject is neither null nor empty. The ACL should however contain USER_EVERYTHING as user needs to have
-        // complete access to the blob
-        assertTrue("ACL does not contain WORLD_EVERYTHING", !metadata.toString().contains("AccessControl(type:OTHER, access:7)"));
-        readAssertEqualsWithAuth(store, who, "test", 2);
-
-        LOG.info("Updating test");
-        try (AtomicOutputStream out = store.updateBlob("test", who)) {
-            out.write(3);
-        }
-        assertStoreHasExactly(store, "test");
-        readAssertEqualsWithAuth(store, who, "test", 3);
-
-        LOG.info("Updating test again");
-        try (AtomicOutputStream out = store.updateBlob("test", who)) {
-            out.write(4);
-        }
-        LOG.info("SLEEPING");
-        Thread.sleep(2);
-        assertStoreHasExactly(store, "test");
-        readAssertEqualsWithAuth(store, who, "test", 3);
+            readAssertEqualsWithAuth(store, who, "test", 1);
+            
+            try (AtomicOutputStream out = store.updateBlob("test", who)) {
+                out.write(2);
+            }
+            assertStoreHasExactly(store, "test");
+            readAssertEqualsWithAuth(store, who, "test", 2);
+            
+            try (AtomicOutputStream out = store.updateBlob("test", who)) {
+                out.write(3);
+            }
+            assertStoreHasExactly(store, "test");
+            readAssertEqualsWithAuth(store, who, "test", 3);
 
-        //Test for subject with no principals and acls set to WORLD_EVERYTHING
-        who = new Subject();
-        metadata = new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING);
-        LOG.info("Creating test");
-        try (AtomicOutputStream out = store.createBlob("test-empty-subject-WE", metadata, who)) {
-            out.write(2);
+            LOG.info("Deleting test");
+            store.deleteBlob("test", who);
+            assertStoreHasExactly(store);
         }
-        assertStoreHasExactly(store, "test-empty-subject-WE", "test");
-        // Testing whether acls are set to WORLD_EVERYTHING
-        assertTrue("ACL does not contain WORLD_EVERYTHING", metadata.toString().contains("AccessControl(type:OTHER, access:7)"));
-        readAssertEqualsWithAuth(store, who, "test-empty-subject-WE", 2);
-
-        //Test for subject with no principals and acls set to DEFAULT
-        who = new Subject();
-        metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
-        LOG.info("Creating other");
-        try (AtomicOutputStream out = store.createBlob("test-empty-subject-DEF", metadata, who)) {
-            out.write(2);
-        }
-        assertStoreHasExactly(store, "test-empty-subject-DEF", "test", "test-empty-subject-WE");
-        // Testing whether acls are set to WORLD_EVERYTHING
-        assertTrue("ACL does not contain WORLD_EVERYTHING", metadata.toString().contains("AccessControl(type:OTHER, access:7)"));
-        readAssertEqualsWithAuth(store, who, "test-empty-subject-DEF", 2);
-
-        if (store instanceof HdfsBlobStore) {
-            ((HdfsBlobStore) store).fullCleanup(1);
-        } else {
-            fail("Error the blobstore is of unknowntype");
+    }
+    
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void testWithAuthenticationNoPrincipal(boolean securityEnabled) throws Exception {
+        try (AutoCloseableBlobStoreContainer container = initHdfs("/storm/blobstore-auth-no-principal-sec-" + securityEnabled)) {
+            BlobStore store = container.blobStore;
+            //Test for subject with no principals
+            Subject who = new Subject();
+            assertStoreHasExactly(store);
+
+            // Tests for case when subject != null (security turned on) and
+            // acls for the blob are set to WORLD_EVERYTHING
+            SettableBlobMeta metadata = new SettableBlobMeta(securityEnabled ? BlobStoreAclHandler.DEFAULT : BlobStoreAclHandler.WORLD_EVERYTHING);
+            try (AtomicOutputStream out = store.createBlob("test", metadata, who)) {
+                out.write(1);
+            }
+            assertStoreHasExactly(store, "test");
+            // With no principals in the subject ACL should always be set to WORLD_EVERYTHING
+            assertTrue("ACL does not contain WORLD_EVERYTHING", metadata.toString().contains("AccessControl(type:OTHER, access:7)"));
+            
+            readAssertEqualsWithAuth(store, who, "test", 1);
         }
     }
 
@@ -535,6 +523,6 @@ public class BlobStoreTest {
             fail("Error the blobstore is of unknowntype");
         }
         assertStoreHasExactly(store, "test");
-        readAssertEquals(store, "test", 3);
+        readAssertEquals(store, "test", 4);
     }
 }
diff --git a/external/storm-hdfs/pom.xml b/external/storm-hdfs/pom.xml
index d0db240..a19d821 100644
--- a/external/storm-hdfs/pom.xml
+++ b/external/storm-hdfs/pom.xml
@@ -234,7 +234,7 @@
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-surefire-plugin</artifactId>
-		<configuration>
+                <configuration>
                     <reuseForks>false</reuseForks>
                     <forkCount>1</forkCount>
                 </configuration>
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
index dfcf30f..a145274 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
@@ -247,7 +247,9 @@ public abstract class AbstractHdfsBolt extends BaseRichBolt {
     @Override
     public void cleanup() {
         doRotationAndRemoveAllWriters();
-        this.rotationTimer.cancel();
+        if (this.rotationTimer != null) {
+            this.rotationTimer.cancel();
+        }
     }
 
     private void doRotationAndRemoveAllWriters() {
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestHdfsBolt.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestHdfsBolt.java
index 32844e7..7f63cc0 100644
--- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestHdfsBolt.java
+++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestHdfsBolt.java
@@ -56,6 +56,7 @@ import org.mockito.runners.MockitoJUnitRunner;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyZeroInteractions;
 
+
 @RunWith(MockitoJUnitRunner.class)
 public class TestHdfsBolt {
 
@@ -203,6 +204,17 @@ public class TestHdfsBolt {
         //Tick should have flushed it
         Assert.assertEquals(1, countNonZeroLengthFiles(testRoot));
     }
+    
+    @Test
+    public void testCleanupDoesNotThrowExceptionWhenRotationPolicyIsNotTimed() {
+        //STORM-3372: Rotation policy other than TimedRotationPolicy causes NPE on cleanup
+        FileRotationPolicy fieldsRotationPolicy =
+            new FileSizeRotationPolicy(10_000, FileSizeRotationPolicy.Units.MB);
+        HdfsBolt bolt = makeHdfsBolt(hdfsURI, 10, 10000f)
+            .withRotationPolicy(fieldsRotationPolicy);
+        bolt.prepare(new Config(), topologyContext, collector);
+        bolt.cleanup();
+    }
 
     public void createBaseDirectory(FileSystem passedFs, String path) throws IOException {
         Path p = new Path(path);
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/format/TestSimpleFileNameFormat.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/format/TestSimpleFileNameFormat.java
index a12ae5d..f8e1e5e 100644
--- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/format/TestSimpleFileNameFormat.java
+++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/format/TestSimpleFileNameFormat.java
@@ -69,7 +69,7 @@ public class TestSimpleFileNameFormat {
     }
 
     private TopologyContext createTopologyContext(Map<String, Object> topoConf) {
-        Map<Integer, String> taskToComponent = new HashMap<Integer, String>();
+        Map<Integer, String> taskToComponent = new HashMap<>();
         taskToComponent.put(7, "Xcom");
         return new TopologyContext(null, topoConf, taskToComponent, null, null, null, null, null, null, 7, 6703, null, null, null, null,
                                    null, null, null);
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSemantics.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSemantics.java
index 1a278c1..3528a3d 100644
--- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSemantics.java
+++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSemantics.java
@@ -12,6 +12,8 @@
 
 package org.apache.storm.hdfs.spout;
 
+import static org.hamcrest.core.IsNull.notNullValue;
+
 import java.io.IOException;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -30,7 +32,6 @@ import org.junit.Test;
 
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
-import static org.mockito.ArgumentMatchers.notNull;
 
 public class TestHdfsSemantics {
 
@@ -124,7 +125,7 @@ public class TestHdfsSemantics {
 
         //2 try to append to a closed file
         try (FSDataOutputStream os2 = fs.append(file1)) {
-            assertThat(os2, notNull());
+            assertThat(os2, notNullValue());
         }
     }
 
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
index c3e89dc..133de5d 100644
--- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
+++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
@@ -12,6 +12,9 @@
 
 package org.apache.storm.hdfs.spout;
 
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.IOException;
@@ -192,6 +195,9 @@ public class TestHdfsSpout {
         Path file1 = new Path(source.toString() + "/file_empty.txt");
         createTextFile(file1, 0);
 
+        //Ensure the second file has a later modified timestamp, as the spout should pick the first file first.
+        Thread.sleep(2);
+
         Path file2 = new Path(source.toString() + "/file.txt");
         createTextFile(file2, 5);
 
@@ -203,15 +209,13 @@ public class TestHdfsSpout {
             conf.put(Config.TOPOLOGY_ACKER_EXECUTORS, "1"); // enable ACKing
             openSpout(spout, 0, conf);
 
-            // consume empty file
-            runSpout(spout, "r1");
-            Path arc1 = new Path(archive.toString() + "/file_empty.txt");
-            checkCollectorOutput_txt((MockCollector) spout.getCollector(), arc1);
-
-            // consume file 2
-            runSpout(spout, "r5", "a0", "a1", "a2", "a3", "a4");
+            // Read once. Since the first file is empty, the spout should continue with file 2
+            runSpout(spout, "r6", "a0", "a1", "a2", "a3", "a4");
+            //File 1 should be moved to archive
+            assertThat(fs.isFile(new Path(archive.toString() + "/file_empty.txt")), is(true));
+            //File 2 should be read
             Path arc2 = new Path(archive.toString() + "/file.txt");
-            checkCollectorOutput_txt((MockCollector) spout.getCollector(), arc1, arc2);
+            checkCollectorOutput_txt((MockCollector) spout.getCollector(), arc2);
         }
     }
 
@@ -681,11 +685,8 @@ public class TestHdfsSpout {
 
     private void createTextFile(Path file, int lineCount) throws IOException {
         FSDataOutputStream os = fs.create(file);
-        int size = 0;
         for (int i = 0; i < lineCount; i++) {
             os.writeBytes("line " + i + System.lineSeparator());
-            String msg = "line " + i + System.lineSeparator();
-            size += msg.getBytes().length;
         }
         os.close();
     }
@@ -772,7 +773,7 @@ public class TestHdfsSpout {
         private final int componentId;
 
         public MockTopologyContext(int componentId, Map<String, Object> topoConf) {
-            super(null, topoConf, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null);
+            super(null, topoConf, null, null, null, null, null, null, null, 0, 0, null, null, null, null, null, null, null);
             this.componentId = componentId;
         }
 
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/testing/MiniDFSClusterExtension.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/testing/MiniDFSClusterExtension.java
new file mode 100644
index 0000000..f88fef5
--- /dev/null
+++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/testing/MiniDFSClusterExtension.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hdfs.testing;
+
+import java.util.function.Supplier;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.jupiter.api.extension.AfterEachCallback;
+import org.junit.jupiter.api.extension.BeforeEachCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+
+public class MiniDFSClusterExtension implements BeforeEachCallback, AfterEachCallback {
+
+    private static final String TEST_BUILD_DATA = "test.build.data";
+
+    private final Supplier<Configuration> hadoopConfSupplier;
+    private Configuration hadoopConf;
+    private MiniDFSCluster dfscluster;
+
+    public MiniDFSClusterExtension() {
+        this(() -> new Configuration());
+    }
+
+    public MiniDFSClusterExtension(Supplier<Configuration> hadoopConfSupplier) {
+        this.hadoopConfSupplier = hadoopConfSupplier;
+    }
+
+    public Configuration getHadoopConf() {
+        return hadoopConf;
+    }
+
+    public MiniDFSCluster getDfscluster() {
+        return dfscluster;
+    }
+
+    @Override
+    public void beforeEach(ExtensionContext arg0) throws Exception {
+        System.setProperty(TEST_BUILD_DATA, "target/test/data");
+        hadoopConf = hadoopConfSupplier.get();
+        dfscluster = new MiniDFSCluster.Builder(hadoopConf).numDataNodes(3).build();
+        dfscluster.waitActive();
+    }
+
+    @Override
+    public void afterEach(ExtensionContext arg0) throws Exception {
+        dfscluster.shutdown();
+        System.clearProperty(TEST_BUILD_DATA);
+    }
+}
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/testing/MiniDFSClusterRule.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/testing/MiniDFSClusterRule.java
index b94fb53..6265a52 100644
--- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/testing/MiniDFSClusterRule.java
+++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/testing/MiniDFSClusterRule.java
@@ -23,6 +23,10 @@ import org.junit.rules.TestRule;
 import org.junit.runner.Description;
 import org.junit.runners.model.Statement;
 
+/**
+ * @deprecated Use {@link MiniDFSClusterExtension} instead, along with JUnit 5 for new tests.
+ */
+@Deprecated
 public class MiniDFSClusterRule implements TestRule {
 
     private static final String TEST_BUILD_DATA = "test.build.data";
@@ -57,6 +61,7 @@ public class MiniDFSClusterRule implements TestRule {
                     hadoopConf = hadoopConfSupplier.get();
                     dfscluster = new MiniDFSCluster.Builder(hadoopConf).numDataNodes(3).build();
                     dfscluster.waitActive();
+                    base.evaluate();
                 } finally {
                     if (dfscluster != null) {
                         dfscluster.shutdown();
diff --git a/external/storm-hdfs/src/test/resources/log4j.properties b/external/storm-hdfs/src/test/resources/log4j.properties
index 1f92e45..c952abd 100644
--- a/external/storm-hdfs/src/test/resources/log4j.properties
+++ b/external/storm-hdfs/src/test/resources/log4j.properties
@@ -20,7 +20,4 @@ log4j.rootLogger = WARN, out
 
 log4j.appender.out = org.apache.log4j.ConsoleAppender
 log4j.appender.out.layout = org.apache.log4j.PatternLayout
-log4j.appender.out.layout.ConversionPattern = %d (%t) [%p - %l] %m%n
-
-log4j.logger.org.apache.storm.hdfs = INFO
-
+log4j.appender.out.layout.ConversionPattern = %d (%t) [%p - %l] %m%n
\ No newline at end of file
diff --git a/external/storm-hdfs/src/test/resources/log4j2.xml b/external/storm-hdfs/src/test/resources/log4j2.xml
new file mode 100755
index 0000000..546b1b3
--- /dev/null
+++ b/external/storm-hdfs/src/test/resources/log4j2.xml
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<Configuration status="WARN">
+    <Appenders>
+        <Console name="Console" target="SYSTEM_OUT">
+            <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n" charset="UTF-8"/>
+        </Console>
+    </Appenders>
+    <Loggers>
+        <Root level="WARN">
+            <AppenderRef ref="Console"/>
+        </Root>
+        <Logger name="org.apache.storm" level="INFO" additivity="false">
+            <AppenderRef ref="Console"/>
+        </Logger>
+    </Loggers>
+</Configuration>
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index f273308..3b587a7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -295,7 +295,7 @@
         <servlet.version>3.1.0</servlet.version>
         <joda-time.version>2.3</joda-time.version>
         <thrift.version>0.12.0</thrift.version>
-        <junit.jupiter.version>5.3.2</junit.jupiter.version>
+        <junit.jupiter.version>5.5.0-M1</junit.jupiter.version>
         <surefire.version>2.22.1</surefire.version>
         <awaitility.version>3.1.0</awaitility.version>
         <hdrhistogram.version>2.1.10</hdrhistogram.version>