You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2019/07/29 21:57:09 UTC
[storm] branch master updated: STORM-3372: Fix NPE when shutting
down HdfsBolt, fix storm-hdfs tests not running
This is an automated email from the ASF dual-hosted git repository.
srdo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/master by this push:
new d1912ae STORM-3372: Fix NPE when shutting down HdfsBolt, fix storm-hdfs tests not running
new f015130 Merge pull request #2990 from srdo/STORM-3372
d1912ae is described below
commit d1912ae98afe9f470a05e57835c41f056cebb311
Author: Stig Rohde Døssing <sr...@apache.org>
AuthorDate: Thu Apr 4 15:58:55 2019 +0200
STORM-3372: Fix NPE when shutting down HdfsBolt, fix storm-hdfs tests not running
---
external/storm-hdfs-blobstore/pom.xml | 5 +
.../apache/storm/hdfs/blobstore/BlobStoreTest.java | 266 ++++++++++-----------
external/storm-hdfs/pom.xml | 2 +-
.../apache/storm/hdfs/bolt/AbstractHdfsBolt.java | 4 +-
.../org/apache/storm/hdfs/bolt/TestHdfsBolt.java | 12 +
.../hdfs/bolt/format/TestSimpleFileNameFormat.java | 2 +-
.../apache/storm/hdfs/spout/TestHdfsSemantics.java | 5 +-
.../org/apache/storm/hdfs/spout/TestHdfsSpout.java | 25 +-
.../hdfs/testing/MiniDFSClusterExtension.java | 64 +++++
.../storm/hdfs/testing/MiniDFSClusterRule.java | 5 +
.../storm-hdfs/src/test/resources/log4j.properties | 5 +-
external/storm-hdfs/src/test/resources/log4j2.xml | 32 +++
pom.xml | 2 +-
13 files changed, 268 insertions(+), 161 deletions(-)
diff --git a/external/storm-hdfs-blobstore/pom.xml b/external/storm-hdfs-blobstore/pom.xml
index e2b948b..ccc69a6 100644
--- a/external/storm-hdfs-blobstore/pom.xml
+++ b/external/storm-hdfs-blobstore/pom.xml
@@ -208,6 +208,11 @@
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-params</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
<plugins>
diff --git a/external/storm-hdfs-blobstore/src/test/java/org/apache/storm/hdfs/blobstore/BlobStoreTest.java b/external/storm-hdfs-blobstore/src/test/java/org/apache/storm/hdfs/blobstore/BlobStoreTest.java
index a125793..53cca75 100644
--- a/external/storm-hdfs-blobstore/src/test/java/org/apache/storm/hdfs/blobstore/BlobStoreTest.java
+++ b/external/storm-hdfs-blobstore/src/test/java/org/apache/storm/hdfs/blobstore/BlobStoreTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.storm.hdfs.blobstore;
+import org.apache.storm.hdfs.testing.MiniDFSClusterExtension;
import org.apache.commons.io.FileUtils;
import org.apache.storm.Config;
import org.apache.storm.blobstore.AtomicOutputStream;
@@ -28,14 +29,9 @@ import org.apache.storm.generated.AccessControlType;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.KeyNotFoundException;
import org.apache.storm.generated.SettableBlobMeta;
-import org.apache.storm.hdfs.testing.MiniDFSClusterRule;
import org.apache.storm.security.auth.FixedGroupsMapping;
import org.apache.storm.security.auth.NimbusPrincipal;
import org.apache.storm.security.auth.SingleUserPrincipal;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.ClassRule;
-import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,30 +51,34 @@ import java.util.UUID;
import static org.junit.Assert.*;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.ValueSource;
+
public class BlobStoreTest {
- @ClassRule
- public static final MiniDFSClusterRule DFS_CLUSTER_RULE = new MiniDFSClusterRule();
+ @RegisterExtension
+ public static final MiniDFSClusterExtension DFS_CLUSTER_EXTENSION = new MiniDFSClusterExtension();
private static final Logger LOG = LoggerFactory.getLogger(BlobStoreTest.class);
URI base;
- File baseFile;
private static final Map<String, Object> CONF = new HashMap<>();
public static final int READ = 0x01;
public static final int WRITE = 0x02;
public static final int ADMIN = 0x04;
- @Before
+ @BeforeEach
public void init() {
initializeConfigs();
- baseFile = new File("/tmp/blob-store-test-" + UUID.randomUUID());
- base = baseFile.toURI();
}
- @After
+ @AfterEach
public void cleanup()
throws IOException {
- FileUtils.deleteDirectory(baseFile);
}
// Method which initializes nimbus admin
@@ -160,7 +160,7 @@ public class BlobStoreTest {
conf.put(Config.STORM_PRINCIPAL_TO_LOCAL_PLUGIN, "org.apache.storm.security.auth.DefaultPrincipalToLocal");
conf.put(Config.STORM_BLOBSTORE_REPLICATION_FACTOR, 3);
HdfsBlobStore store = new HdfsBlobStore();
- store.prepareInternal(conf, null, DFS_CLUSTER_RULE.getDfscluster().getConfiguration(0));
+ store.prepareInternal(conf, null, DFS_CLUSTER_EXTENSION.getDfscluster().getConfiguration(0));
return new AutoCloseableBlobStoreContainer(store);
}
@@ -204,15 +204,6 @@ public class BlobStoreTest {
}
}
- @Test
- public void testHdfsWithAuth()
- throws Exception {
- // use different blobstore dir so it doesn't conflict with other tests
- try (AutoCloseableBlobStoreContainer container = initHdfs("/storm/blobstore3")) {
- testWithAuthentication(container.blobStore);
- }
- }
-
// Test for replication.
public void testReplication(String path, BlobStore store)
throws Exception {
@@ -289,133 +280,130 @@ public class BlobStoreTest {
store.deleteBlob("test", getSubject(createSubject));
}
- public Subject getSubject(String name) {
+ public static Subject getSubject(String name) {
Subject subject = new Subject();
SingleUserPrincipal user = new SingleUserPrincipal(name);
subject.getPrincipals().add(user);
return subject;
}
-
- // Check for Blobstore with authentication
- public void testWithAuthentication(BlobStore store)
- throws Exception {
- //Test for Nimbus Admin
- Subject admin = getSubject("admin");
- assertStoreHasExactly(store);
- SettableBlobMeta metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
- try (AtomicOutputStream out = store.createBlob("test", metadata, admin)) {
- assertStoreHasExactly(store, "test");
- out.write(1);
+
+ static enum AuthenticationTestSubject {
+ //Nimbus Admin
+ ADMIN(getSubject("admin")),
+ //Nimbus groups admin
+ ADMIN_GROUPS_USER(getSubject("adminGroupsUser")),
+ //Supervisor admin
+ SUPERVISOR(getSubject("supervisor")),
+ //Nimbus itself
+ NIMBUS(getNimbusSubject());
+
+ private Subject subject;
+
+ private AuthenticationTestSubject(Subject subject) {
+ this.subject = subject;
}
- store.deleteBlob("test", admin);
-
- //Test for Nimbus Groups Admin
- Subject adminsGroupsUser = getSubject("adminsGroupsUser");
- assertStoreHasExactly(store);
- metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
- try (AtomicOutputStream out = store.createBlob("test", metadata, adminsGroupsUser)) {
- assertStoreHasExactly(store, "test");
- out.write(1);
+ }
+
+ @ParameterizedTest
+ @EnumSource(value = AuthenticationTestSubject.class)
+ void testWithAuthentication(AuthenticationTestSubject testSubject) throws Exception {
+ try (AutoCloseableBlobStoreContainer container = initHdfs("/storm/blobstore-auth-" + testSubject.name())) {
+ BlobStore store = container.blobStore;
+ assertStoreHasExactly(store);
+ SettableBlobMeta metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
+ try (AtomicOutputStream out = store.createBlob("test", metadata, testSubject.subject)) {
+ assertStoreHasExactly(store, "test");
+ out.write(1);
+ }
+ store.deleteBlob("test", testSubject.subject);
}
- store.deleteBlob("test", adminsGroupsUser);
-
- //Test for Supervisor Admin
- Subject supervisor = getSubject("supervisor");
- assertStoreHasExactly(store);
- metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
- try (AtomicOutputStream out = store.createBlob("test", metadata, supervisor)) {
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void testWithAuthenticationDummy(boolean securityEnabled) throws Exception {
+ try (AutoCloseableBlobStoreContainer container = initHdfs("/storm/blobstore-auth-dummy-sec-" + securityEnabled)) {
+ BlobStore store = container.blobStore;
+ Subject who = getSubject("test_subject");
+ assertStoreHasExactly(store);
+
+ // Tests for case when subject != null (security turned on) and
+ // acls for the blob are set to WORLD_EVERYTHING
+ SettableBlobMeta metadata = new SettableBlobMeta(securityEnabled ? BlobStoreAclHandler.DEFAULT : BlobStoreAclHandler.WORLD_EVERYTHING);
+ try (AtomicOutputStream out = store.createBlob("test", metadata, who)) {
+ out.write(1);
+ }
assertStoreHasExactly(store, "test");
- out.write(1);
+ if (securityEnabled) {
+ // Testing whether acls are set to WORLD_EVERYTHING. Here the acl should not contain WORLD_EVERYTHING because
+ // the subject is neither null nor empty. The ACL should however contain USER_EVERYTHING as user needs to have
+ // complete access to the blob
+ assertTrue("ACL contains WORLD_EVERYTHING", !metadata.toString().contains("AccessControl(type:OTHER, access:7)"));
+ } else {
+ // Testing whether acls are set to WORLD_EVERYTHING
+ assertTrue("ACL does not contain WORLD_EVERYTHING", metadata.toString().contains("AccessControl(type:OTHER, access:7)"));
+ }
+
+ readAssertEqualsWithAuth(store, who, "test", 1);
+
+ LOG.info("Deleting test");
+ store.deleteBlob("test", who);
+ assertStoreHasExactly(store);
}
- store.deleteBlob("test", supervisor);
-
- //Test for Nimbus itself as a user
- Subject nimbus = getNimbusSubject();
- assertStoreHasExactly(store);
- metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
- try (AtomicOutputStream out = store.createBlob("test", metadata, nimbus)) {
+ }
+
+ @Test
+ void testWithAuthenticationUpdate() throws Exception {
+ try (AutoCloseableBlobStoreContainer container = initHdfs("/storm/blobstore-auth-update")) {
+ BlobStore store = container.blobStore;
+ Subject who = getSubject("test_subject");
+ assertStoreHasExactly(store);
+
+ SettableBlobMeta metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
+ try (AtomicOutputStream out = store.createBlob("test", metadata, who)) {
+ out.write(1);
+ }
assertStoreHasExactly(store, "test");
- out.write(1);
- }
- store.deleteBlob("test", nimbus);
-
- // Test with a dummy test_subject for cases where subject !=null (security turned on)
- Subject who = getSubject("test_subject");
- assertStoreHasExactly(store);
-
- // Tests for case when subject != null (security turned on) and
- // acls for the blob are set to WORLD_EVERYTHING
- metadata = new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING);
- try (AtomicOutputStream out = store.createBlob("test", metadata, who)) {
- out.write(1);
- }
- assertStoreHasExactly(store, "test");
- // Testing whether acls are set to WORLD_EVERYTHING
- assertTrue("ACL does not contain WORLD_EVERYTHING", metadata.toString().contains("AccessControl(type:OTHER, access:7)"));
- readAssertEqualsWithAuth(store, who, "test", 1);
-
- LOG.info("Deleting test");
- store.deleteBlob("test", who);
- assertStoreHasExactly(store);
-
- // Tests for case when subject != null (security turned on) and
- // acls are not set for the blob (DEFAULT)
- LOG.info("Creating test again");
- metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
- try (AtomicOutputStream out = store.createBlob("test", metadata, who)) {
- out.write(2);
- }
- assertStoreHasExactly(store, "test");
- // Testing whether acls are set to WORLD_EVERYTHING. Here the acl should not contain WORLD_EVERYTHING because
- // the subject is neither null nor empty. The ACL should however contain USER_EVERYTHING as user needs to have
- // complete access to the blob
- assertTrue("ACL does not contain WORLD_EVERYTHING", !metadata.toString().contains("AccessControl(type:OTHER, access:7)"));
- readAssertEqualsWithAuth(store, who, "test", 2);
-
- LOG.info("Updating test");
- try (AtomicOutputStream out = store.updateBlob("test", who)) {
- out.write(3);
- }
- assertStoreHasExactly(store, "test");
- readAssertEqualsWithAuth(store, who, "test", 3);
-
- LOG.info("Updating test again");
- try (AtomicOutputStream out = store.updateBlob("test", who)) {
- out.write(4);
- }
- LOG.info("SLEEPING");
- Thread.sleep(2);
- assertStoreHasExactly(store, "test");
- readAssertEqualsWithAuth(store, who, "test", 3);
+ readAssertEqualsWithAuth(store, who, "test", 1);
+
+ try (AtomicOutputStream out = store.updateBlob("test", who)) {
+ out.write(2);
+ }
+ assertStoreHasExactly(store, "test");
+ readAssertEqualsWithAuth(store, who, "test", 2);
+
+ try (AtomicOutputStream out = store.updateBlob("test", who)) {
+ out.write(3);
+ }
+ assertStoreHasExactly(store, "test");
+ readAssertEqualsWithAuth(store, who, "test", 3);
- //Test for subject with no principals and acls set to WORLD_EVERYTHING
- who = new Subject();
- metadata = new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING);
- LOG.info("Creating test");
- try (AtomicOutputStream out = store.createBlob("test-empty-subject-WE", metadata, who)) {
- out.write(2);
+ LOG.info("Deleting test");
+ store.deleteBlob("test", who);
+ assertStoreHasExactly(store);
}
- assertStoreHasExactly(store, "test-empty-subject-WE", "test");
- // Testing whether acls are set to WORLD_EVERYTHING
- assertTrue("ACL does not contain WORLD_EVERYTHING", metadata.toString().contains("AccessControl(type:OTHER, access:7)"));
- readAssertEqualsWithAuth(store, who, "test-empty-subject-WE", 2);
-
- //Test for subject with no principals and acls set to DEFAULT
- who = new Subject();
- metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
- LOG.info("Creating other");
- try (AtomicOutputStream out = store.createBlob("test-empty-subject-DEF", metadata, who)) {
- out.write(2);
- }
- assertStoreHasExactly(store, "test-empty-subject-DEF", "test", "test-empty-subject-WE");
- // Testing whether acls are set to WORLD_EVERYTHING
- assertTrue("ACL does not contain WORLD_EVERYTHING", metadata.toString().contains("AccessControl(type:OTHER, access:7)"));
- readAssertEqualsWithAuth(store, who, "test-empty-subject-DEF", 2);
-
- if (store instanceof HdfsBlobStore) {
- ((HdfsBlobStore) store).fullCleanup(1);
- } else {
- fail("Error the blobstore is of unknowntype");
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void testWithAuthenticationNoPrincipal(boolean securityEnabled) throws Exception {
+ try (AutoCloseableBlobStoreContainer container = initHdfs("/storm/blobstore-auth-no-principal-sec-" + securityEnabled)) {
+ BlobStore store = container.blobStore;
+ //Test for subject with no principals
+ Subject who = new Subject();
+ assertStoreHasExactly(store);
+
+ // Tests for case when subject != null (security turned on) and
+ // acls for the blob are set to WORLD_EVERYTHING
+ SettableBlobMeta metadata = new SettableBlobMeta(securityEnabled ? BlobStoreAclHandler.DEFAULT : BlobStoreAclHandler.WORLD_EVERYTHING);
+ try (AtomicOutputStream out = store.createBlob("test", metadata, who)) {
+ out.write(1);
+ }
+ assertStoreHasExactly(store, "test");
+ // With no principals in the subject ACL should always be set to WORLD_EVERYTHING
+ assertTrue("ACL does not contain WORLD_EVERYTHING", metadata.toString().contains("AccessControl(type:OTHER, access:7)"));
+
+ readAssertEqualsWithAuth(store, who, "test", 1);
}
}
@@ -535,6 +523,6 @@ public class BlobStoreTest {
fail("Error the blobstore is of unknowntype");
}
assertStoreHasExactly(store, "test");
- readAssertEquals(store, "test", 3);
+ readAssertEquals(store, "test", 4);
}
}
diff --git a/external/storm-hdfs/pom.xml b/external/storm-hdfs/pom.xml
index d0db240..a19d821 100644
--- a/external/storm-hdfs/pom.xml
+++ b/external/storm-hdfs/pom.xml
@@ -234,7 +234,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
- <configuration>
+ <configuration>
<reuseForks>false</reuseForks>
<forkCount>1</forkCount>
</configuration>
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
index dfcf30f..a145274 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
@@ -247,7 +247,9 @@ public abstract class AbstractHdfsBolt extends BaseRichBolt {
@Override
public void cleanup() {
doRotationAndRemoveAllWriters();
- this.rotationTimer.cancel();
+ if (this.rotationTimer != null) {
+ this.rotationTimer.cancel();
+ }
}
private void doRotationAndRemoveAllWriters() {
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestHdfsBolt.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestHdfsBolt.java
index 32844e7..7f63cc0 100644
--- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestHdfsBolt.java
+++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestHdfsBolt.java
@@ -56,6 +56,7 @@ import org.mockito.runners.MockitoJUnitRunner;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
+
@RunWith(MockitoJUnitRunner.class)
public class TestHdfsBolt {
@@ -203,6 +204,17 @@ public class TestHdfsBolt {
//Tick should have flushed it
Assert.assertEquals(1, countNonZeroLengthFiles(testRoot));
}
+
+ @Test
+ public void testCleanupDoesNotThrowExceptionWhenRotationPolicyIsNotTimed() {
+ //STORM-3372: Rotation policy other than TimedRotationPolicy causes NPE on cleanup
+ FileRotationPolicy fieldsRotationPolicy =
+ new FileSizeRotationPolicy(10_000, FileSizeRotationPolicy.Units.MB);
+ HdfsBolt bolt = makeHdfsBolt(hdfsURI, 10, 10000f)
+ .withRotationPolicy(fieldsRotationPolicy);
+ bolt.prepare(new Config(), topologyContext, collector);
+ bolt.cleanup();
+ }
public void createBaseDirectory(FileSystem passedFs, String path) throws IOException {
Path p = new Path(path);
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/format/TestSimpleFileNameFormat.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/format/TestSimpleFileNameFormat.java
index a12ae5d..f8e1e5e 100644
--- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/format/TestSimpleFileNameFormat.java
+++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/format/TestSimpleFileNameFormat.java
@@ -69,7 +69,7 @@ public class TestSimpleFileNameFormat {
}
private TopologyContext createTopologyContext(Map<String, Object> topoConf) {
- Map<Integer, String> taskToComponent = new HashMap<Integer, String>();
+ Map<Integer, String> taskToComponent = new HashMap<>();
taskToComponent.put(7, "Xcom");
return new TopologyContext(null, topoConf, taskToComponent, null, null, null, null, null, null, 7, 6703, null, null, null, null,
null, null, null);
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSemantics.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSemantics.java
index 1a278c1..3528a3d 100644
--- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSemantics.java
+++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSemantics.java
@@ -12,6 +12,8 @@
package org.apache.storm.hdfs.spout;
+import static org.hamcrest.core.IsNull.notNullValue;
+
import java.io.IOException;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FSDataOutputStream;
@@ -30,7 +32,6 @@ import org.junit.Test;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
-import static org.mockito.ArgumentMatchers.notNull;
public class TestHdfsSemantics {
@@ -124,7 +125,7 @@ public class TestHdfsSemantics {
//2 try to append to a closed file
try (FSDataOutputStream os2 = fs.append(file1)) {
- assertThat(os2, notNull());
+ assertThat(os2, notNullValue());
}
}
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
index c3e89dc..133de5d 100644
--- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
+++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
@@ -12,6 +12,9 @@
package org.apache.storm.hdfs.spout;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
@@ -192,6 +195,9 @@ public class TestHdfsSpout {
Path file1 = new Path(source.toString() + "/file_empty.txt");
createTextFile(file1, 0);
+ //Ensure the second file has a later modified timestamp, as the spout should pick the first file first.
+ Thread.sleep(2);
+
Path file2 = new Path(source.toString() + "/file.txt");
createTextFile(file2, 5);
@@ -203,15 +209,13 @@ public class TestHdfsSpout {
conf.put(Config.TOPOLOGY_ACKER_EXECUTORS, "1"); // enable ACKing
openSpout(spout, 0, conf);
- // consume empty file
- runSpout(spout, "r1");
- Path arc1 = new Path(archive.toString() + "/file_empty.txt");
- checkCollectorOutput_txt((MockCollector) spout.getCollector(), arc1);
-
- // consume file 2
- runSpout(spout, "r5", "a0", "a1", "a2", "a3", "a4");
+ // Read once. Since the first file is empty, the spout should continue with file 2
+ runSpout(spout, "r6", "a0", "a1", "a2", "a3", "a4");
+ //File 1 should be moved to archive
+ assertThat(fs.isFile(new Path(archive.toString() + "/file_empty.txt")), is(true));
+ //File 2 should be read
Path arc2 = new Path(archive.toString() + "/file.txt");
- checkCollectorOutput_txt((MockCollector) spout.getCollector(), arc1, arc2);
+ checkCollectorOutput_txt((MockCollector) spout.getCollector(), arc2);
}
}
@@ -681,11 +685,8 @@ public class TestHdfsSpout {
private void createTextFile(Path file, int lineCount) throws IOException {
FSDataOutputStream os = fs.create(file);
- int size = 0;
for (int i = 0; i < lineCount; i++) {
os.writeBytes("line " + i + System.lineSeparator());
- String msg = "line " + i + System.lineSeparator();
- size += msg.getBytes().length;
}
os.close();
}
@@ -772,7 +773,7 @@ public class TestHdfsSpout {
private final int componentId;
public MockTopologyContext(int componentId, Map<String, Object> topoConf) {
- super(null, topoConf, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null);
+ super(null, topoConf, null, null, null, null, null, null, null, 0, 0, null, null, null, null, null, null, null);
this.componentId = componentId;
}
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/testing/MiniDFSClusterExtension.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/testing/MiniDFSClusterExtension.java
new file mode 100644
index 0000000..f88fef5
--- /dev/null
+++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/testing/MiniDFSClusterExtension.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hdfs.testing;
+
+import java.util.function.Supplier;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.jupiter.api.extension.AfterEachCallback;
+import org.junit.jupiter.api.extension.BeforeEachCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+
+public class MiniDFSClusterExtension implements BeforeEachCallback, AfterEachCallback {
+
+ private static final String TEST_BUILD_DATA = "test.build.data";
+
+ private final Supplier<Configuration> hadoopConfSupplier;
+ private Configuration hadoopConf;
+ private MiniDFSCluster dfscluster;
+
+ public MiniDFSClusterExtension() {
+ this(() -> new Configuration());
+ }
+
+ public MiniDFSClusterExtension(Supplier<Configuration> hadoopConfSupplier) {
+ this.hadoopConfSupplier = hadoopConfSupplier;
+ }
+
+ public Configuration getHadoopConf() {
+ return hadoopConf;
+ }
+
+ public MiniDFSCluster getDfscluster() {
+ return dfscluster;
+ }
+
+ @Override
+ public void beforeEach(ExtensionContext arg0) throws Exception {
+ System.setProperty(TEST_BUILD_DATA, "target/test/data");
+ hadoopConf = hadoopConfSupplier.get();
+ dfscluster = new MiniDFSCluster.Builder(hadoopConf).numDataNodes(3).build();
+ dfscluster.waitActive();
+ }
+
+ @Override
+ public void afterEach(ExtensionContext arg0) throws Exception {
+ dfscluster.shutdown();
+ System.clearProperty(TEST_BUILD_DATA);
+ }
+}
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/testing/MiniDFSClusterRule.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/testing/MiniDFSClusterRule.java
index b94fb53..6265a52 100644
--- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/testing/MiniDFSClusterRule.java
+++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/testing/MiniDFSClusterRule.java
@@ -23,6 +23,10 @@ import org.junit.rules.TestRule;
import org.junit.runner.Description;
import org.junit.runners.model.Statement;
+/**
+ * @deprecated Use {@link MiniDFSClusterExtension} instead, along with JUnit 5 for new tests.
+ */
+@Deprecated
public class MiniDFSClusterRule implements TestRule {
private static final String TEST_BUILD_DATA = "test.build.data";
@@ -57,6 +61,7 @@ public class MiniDFSClusterRule implements TestRule {
hadoopConf = hadoopConfSupplier.get();
dfscluster = new MiniDFSCluster.Builder(hadoopConf).numDataNodes(3).build();
dfscluster.waitActive();
+ base.evaluate();
} finally {
if (dfscluster != null) {
dfscluster.shutdown();
diff --git a/external/storm-hdfs/src/test/resources/log4j.properties b/external/storm-hdfs/src/test/resources/log4j.properties
index 1f92e45..c952abd 100644
--- a/external/storm-hdfs/src/test/resources/log4j.properties
+++ b/external/storm-hdfs/src/test/resources/log4j.properties
@@ -20,7 +20,4 @@ log4j.rootLogger = WARN, out
log4j.appender.out = org.apache.log4j.ConsoleAppender
log4j.appender.out.layout = org.apache.log4j.PatternLayout
-log4j.appender.out.layout.ConversionPattern = %d (%t) [%p - %l] %m%n
-
-log4j.logger.org.apache.storm.hdfs = INFO
-
+log4j.appender.out.layout.ConversionPattern = %d (%t) [%p - %l] %m%n
\ No newline at end of file
diff --git a/external/storm-hdfs/src/test/resources/log4j2.xml b/external/storm-hdfs/src/test/resources/log4j2.xml
new file mode 100755
index 0000000..546b1b3
--- /dev/null
+++ b/external/storm-hdfs/src/test/resources/log4j2.xml
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<Configuration status="WARN">
+ <Appenders>
+ <Console name="Console" target="SYSTEM_OUT">
+ <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n" charset="UTF-8"/>
+ </Console>
+ </Appenders>
+ <Loggers>
+ <Root level="WARN">
+ <AppenderRef ref="Console"/>
+ </Root>
+ <Logger name="org.apache.storm" level="INFO" additivity="false">
+ <AppenderRef ref="Console"/>
+ </Logger>
+ </Loggers>
+</Configuration>
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index f273308..3b587a7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -295,7 +295,7 @@
<servlet.version>3.1.0</servlet.version>
<joda-time.version>2.3</joda-time.version>
<thrift.version>0.12.0</thrift.version>
- <junit.jupiter.version>5.3.2</junit.jupiter.version>
+ <junit.jupiter.version>5.5.0-M1</junit.jupiter.version>
<surefire.version>2.22.1</surefire.version>
<awaitility.version>3.1.0</awaitility.version>
<hdrhistogram.version>2.1.10</hdrhistogram.version>