You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2016/01/15 17:46:09 UTC
[08/24] storm git commit: Functionally complete. Not well tested.
Have some UTs
http://git-wip-us.apache.org/repos/asf/storm/blob/60e7a812/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
new file mode 100644
index 0000000..9200c90
--- /dev/null
+++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
@@ -0,0 +1,465 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hdfs.spout;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.storm.hdfs.common.HdfsUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.junit.Before;
+import org.junit.After;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import sun.reflect.generics.reflectiveObjects.NotImplementedException;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.storm.hdfs.common.HdfsUtils.Pair;
+
+
+public class TestHdfsSpout {
+
+ @Rule
+ public TemporaryFolder tempFolder = new TemporaryFolder();
+ public File baseFolder;
+ private Path source;
+ private Path archive;
+ private Path badfiles;
+
+
+ public TestHdfsSpout() {
+ }
+
+ static MiniDFSCluster.Builder builder;
+ static MiniDFSCluster hdfsCluster;
+ static FileSystem fs;
+ static String hdfsURI;
+ static Configuration conf = new Configuration();
+
+ @BeforeClass
+ public static void setupClass() throws IOException {
+ builder = new MiniDFSCluster.Builder(new Configuration());
+ hdfsCluster = builder.build();
+ fs = hdfsCluster.getFileSystem();
+ hdfsURI = "hdfs://localhost:" + hdfsCluster.getNameNodePort() + "/";
+ }
+
+ @AfterClass
+ public static void teardownClass() throws IOException {
+ fs.close();
+ hdfsCluster.shutdown();
+ }
+
+
+ @Before
+ public void setup() throws Exception {
+ baseFolder = tempFolder.newFolder("hdfsspout");
+ source = new Path(baseFolder.toString() + "/source");
+ fs.mkdirs(source);
+ archive = new Path(baseFolder.toString() + "/archive");
+ fs.mkdirs(archive);
+ badfiles = new Path(baseFolder.toString() + "/bad");
+ fs.mkdirs(badfiles);
+
+ }
+
+ @After
+ public void shutDown() throws IOException {
+ fs.delete(new Path(baseFolder.toString()),true);
+ }
+
+ @Test
+ public void testSimpleText() throws IOException {
+ Path file1 = new Path(source.toString() + "/file1.txt");
+ createTextFile(file1, 5);
+
+ Path file2 = new Path(source.toString() + "/file2.txt");
+ createTextFile(file2, 5);
+
+ listDir(source);
+
+ Map conf = getDefaultConfig();
+ conf.put(Configs.COMMIT_FREQ_COUNT, "1");
+ conf.put(Configs.COMMIT_FREQ_SEC, "1");
+ HdfsSpout spout = makeSpout(0, conf, Configs.TEXT);
+
+ List<String> res = runSpout(spout,"r11", "a0", "a1", "a2", "a3", "a4");
+ for (String re : res) {
+ System.err.println(re);
+ }
+
+ listCompletedDir();
+ Path arc1 = new Path(archive.toString() + "/file1.txt");
+ Path arc2 = new Path(archive.toString() + "/file2.txt");
+ checkCollectorOutput_txt((MockCollector) spout.getCollector(), arc1, arc2);
+ }
+
+
+ private void checkCollectorOutput_txt(MockCollector collector, Path... txtFiles) throws IOException {
+ ArrayList<String> expected = new ArrayList<>();
+ for (Path txtFile : txtFiles) {
+ List<String> lines= getTextFileContents(fs, txtFile);
+ expected.addAll(lines);
+ }
+
+ List<String> actual = new ArrayList<>();
+ for (Pair<HdfsSpout.MessageId, List<Object>> item : collector.items) {
+ actual.add(item.getValue().get(0).toString());
+ }
+ Assert.assertEquals(expected, actual);
+ }
+
+ private List<String> getTextFileContents(FileSystem fs, Path txtFile) throws IOException {
+ ArrayList<String> result = new ArrayList<>();
+ FSDataInputStream istream = fs.open(txtFile);
+ InputStreamReader isreader = new InputStreamReader(istream,"UTF-8");
+ BufferedReader reader = new BufferedReader(isreader);
+
+ for( String line = reader.readLine(); line!=null; line = reader.readLine() ) {
+ result.add(line);
+ }
+ isreader.close();
+ return result;
+ }
+
+ private void checkCollectorOutput_seq(MockCollector collector, Path... seqFiles) throws IOException {
+ ArrayList<String> expected = new ArrayList<>();
+ for (Path seqFile : seqFiles) {
+ List<String> lines= getSeqFileContents(fs, seqFile);
+ expected.addAll(lines);
+ }
+ Assert.assertTrue(expected.equals(collector.lines));
+ }
+
+ private List<String> getSeqFileContents(FileSystem fs, Path... seqFiles) throws IOException {
+ ArrayList<String> result = new ArrayList<>();
+
+ for (Path seqFile : seqFiles) {
+ FSDataInputStream istream = fs.open(seqFile);
+ try {
+ SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(seqFile));
+ Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+ Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
+ while (reader.next(key, value) ) {
+ String keyValStr = Arrays.asList(key,value).toString();
+ result.add(keyValStr);
+ }
+ } finally {
+ istream.close();
+ }
+ }// for
+ return result;
+ }
+
+ private void listCompletedDir() throws IOException {
+ listDir(source);
+ listDir(archive);
+ }
+
+ private List<String> listBadDir() throws IOException {
+ return listDir(badfiles);
+ }
+
+ private List<String> listDir(Path p) throws IOException {
+ ArrayList<String> result = new ArrayList<>();
+ System.err.println("*** Listing " + p);
+ RemoteIterator<LocatedFileStatus> fileNames = fs.listFiles(p, false);
+ while ( fileNames.hasNext() ) {
+ LocatedFileStatus fileStatus = fileNames.next();
+ System.err.println(fileStatus.getPath());
+ result.add(fileStatus.getPath().toString());
+ }
+ return result;
+ }
+
+
+ @Test
+ public void testSimpleSequenceFile() throws IOException {
+
+ source = new Path("/tmp/hdfsspout/source");
+ fs.mkdirs(source);
+ archive = new Path("/tmp/hdfsspout/archive");
+ fs.mkdirs(archive);
+
+ Path file1 = new Path(source + "/file1.seq");
+ createSeqFile(fs, file1);
+
+ Path file2 = new Path(source + "/file2.seq");
+ createSeqFile(fs, file2);
+
+ Map conf = getDefaultConfig();
+ HdfsSpout spout = makeSpout(0, conf, Configs.SEQ);
+
+ List<String> res = runSpout(spout, "r11", "a0", "a1", "a2", "a3", "a4");
+ for (String re : res) {
+ System.err.println(re);
+ }
+
+ listDir(source);
+
+
+ Path f1 = new Path(archive + "/file1.seq");
+ Path f2 = new Path(archive + "/file2.seq");
+
+ checkCollectorOutput_seq((MockCollector) spout.getCollector(), f1, f2);
+ }
+
+// - TODO: this test needs the spout to fail with an exception
+ @Test
+ public void testFailure() throws Exception {
+
+ Path file1 = new Path(source.toString() + "/file1.txt");
+ createTextFile(file1, 5);
+
+ listDir(source);
+
+ Map conf = getDefaultConfig();
+// conf.put(HdfsSpout.Configs.BACKOFF_SEC, "2");
+ HdfsSpout spout = makeSpout(0, conf, MockTextFailingReader.class.getName());
+ List<String> res = runSpout(spout, "r3");
+ for (String re : res) {
+ System.err.println(re);
+ }
+
+ listCompletedDir();
+ List<String> badFiles = listBadDir();
+ Assert.assertEquals( badFiles.size(), 1);
+ Assert.assertEquals(((MockCollector) spout.getCollector()).lines.size(), 1);
+ }
+
+ // @Test
+ public void testLocking() throws Exception {
+ Path file1 = new Path(source.toString() + "/file1.txt");
+ createTextFile(file1, 5);
+
+ listDir(source);
+
+ Map conf = getDefaultConfig();
+ conf.put(Configs.COMMIT_FREQ_COUNT, "1");
+ conf.put(Configs.COMMIT_FREQ_SEC, "1");
+ HdfsSpout spout = makeSpout(0, conf, Configs.TEXT);
+ List<String> res = runSpout(spout,"r4");
+ for (String re : res) {
+ System.err.println(re);
+ }
+ List<String> lockFiles = listDir(spout.getLockDirPath());
+ Assert.assertEquals(1, lockFiles.size());
+ runSpout(spout, "r3");
+ List<String> lines = readTextFile(fs, lockFiles.get(0));
+ System.err.println(lines);
+ Assert.assertEquals(6, lines.size());
+ }
+
+ private static List<String> readTextFile(FileSystem fs, String f) throws IOException {
+ Path file = new Path(f);
+ FSDataInputStream x = fs.open(file);
+ BufferedReader reader = new BufferedReader(new InputStreamReader(x));
+ String line = null;
+ ArrayList<String> result = new ArrayList<>();
+ while( (line = reader.readLine()) !=null )
+ result.add( line );
+ return result;
+ }
+
+ private Map getDefaultConfig() {
+ Map conf = new HashMap();
+ conf.put(Configs.SOURCE_DIR, source.toString());
+ conf.put(Configs.ARCHIVE_DIR, archive.toString());
+ conf.put(Configs.BAD_DIR, badfiles.toString());
+ conf.put("filesystem", fs);
+ return conf;
+ }
+
+
+ private static HdfsSpout makeSpout(int spoutId, Map conf, String readerType) {
+ HdfsSpout spout = new HdfsSpout();
+ MockCollector collector = new MockCollector();
+ conf.put(Configs.READER_TYPE, readerType);
+ spout.open(conf, new MockTopologyContext(spoutId), collector);
+ return spout;
+ }
+
+ /**
+ * Execute a sequence of calls to EventHubSpout.
+ *
+ * @param cmds: set of commands to run,
+ * e.g. "r,r,r,r,a1,f2,...". The commands are:
+ * r[N] - receive() called N times
+ * aN - ack, item number: N
+ * fN - fail, item number: N
+ */
+
+ private List<String> runSpout(HdfsSpout spout, String... cmds) {
+ MockCollector collector = (MockCollector) spout.getCollector();
+ for(String cmd : cmds) {
+ if(cmd.startsWith("r")) {
+ int count = 1;
+ if(cmd.length() > 1) {
+ count = Integer.parseInt(cmd.substring(1));
+ }
+ for(int i=0; i<count; ++i) {
+ spout.nextTuple();
+ }
+ }
+ else if(cmd.startsWith("a")) {
+ int n = Integer.parseInt(cmd.substring(1));
+ Pair<HdfsSpout.MessageId, List<Object>> item = collector.items.get(n);
+ spout.ack(item.getKey());
+ }
+ else if(cmd.startsWith("f")) {
+ int n = Integer.parseInt(cmd.substring(1));
+ Pair<HdfsSpout.MessageId, List<Object>> item = collector.items.get(n);
+ spout.fail(item.getKey());
+ }
+ }
+ return collector.lines;
+ }
+
+ private void createTextFile(Path file, int lineCount) throws IOException {
+ FSDataOutputStream os = fs.create(file);
+ for (int i = 0; i < lineCount; i++) {
+ os.writeBytes("line " + i + System.lineSeparator());
+ }
+ os.close();
+ }
+
+
+
+ private static void createSeqFile(FileSystem fs, Path file) throws IOException {
+
+ Configuration conf = new Configuration();
+ try {
+ if(fs.exists(file)) {
+ fs.delete(file, false);
+ }
+
+ SequenceFile.Writer w = SequenceFile.createWriter(fs, conf, file, IntWritable.class, Text.class );
+ for (int i = 0; i < 5; i++) {
+ w.append(new IntWritable(i), new Text("line " + i));
+ }
+ w.close();
+ System.out.println("done");
+ } catch (IOException e) {
+ e.printStackTrace();
+
+ }
+ }
+
+
+
+ static class MockCollector extends SpoutOutputCollector {
+ //comma separated offsets
+ public ArrayList<String> lines;
+ public ArrayList<Pair<HdfsSpout.MessageId, List<Object> > > items;
+
+ public MockCollector() {
+ super(null);
+ lines = new ArrayList<>();
+ items = new ArrayList<>();
+ }
+
+
+
+ @Override
+ public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) {
+// HdfsSpout.MessageId id = (HdfsSpout.MessageId) messageId;
+// lines.add(id.toString() + ' ' + tuple.toString());
+ lines.add(tuple.toString());
+ items.add(HdfsUtils.Pair.of(messageId, tuple));
+ return null;
+ }
+
+ @Override
+ public void emitDirect(int arg0, String arg1, List<Object> arg2, Object arg3) {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public void reportError(Throwable arg0) {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public long getPendingCount() {
+ return 0;
+ }
+ } // class MockCollector
+
+
+
+ // Throws exceptions for 2nd and 3rd line read attempt
+ static class MockTextFailingReader extends TextFileReader {
+ int readAttempts = 0;
+
+ public MockTextFailingReader(FileSystem fs, Path file, Map conf) throws IOException {
+ super(fs, file, conf);
+ }
+
+ @Override
+ public List<Object> next() throws IOException, ParseException {
+ readAttempts++;
+ if (readAttempts == 2) {
+ throw new IOException("mock test exception");
+ } else if (readAttempts >= 3) {
+ throw new ParseException("mock test exception", null);
+ }
+ return super.next();
+ }
+ }
+
+ static class MockTopologyContext extends TopologyContext {
+ private final int componentId;
+
+ public MockTopologyContext(int componentId) {
+ // StormTopology topology, Map stormConf, Map<Integer, String> taskToComponent, Map<String, List<Integer>> componentToSortedTasks, Map<String, Map<String, Fields>> componentToStreamToFields, String stormId, String codeDir, String pidDir, Integer taskId, Integer workerPort, List<Integer> workerTasks, Map<String, Object> defaultResources, Map<String, Object> userResources, Map<String, Object> executorData, Map<Integer, Map<Integer, Map<String, IMetric>>> registeredMetrics, Atom openOrPrepareWasCalled
+ super(null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null);
+ this.componentId = componentId;
+ }
+
+ public String getThisComponentId() {
+ return Integer.toString( componentId );
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/60e7a812/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestProgressTracker.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestProgressTracker.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestProgressTracker.java
new file mode 100644
index 0000000..1a00674
--- /dev/null
+++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestProgressTracker.java
@@ -0,0 +1,108 @@
+package org.apache.storm.hdfs.spout;
+
+
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+
+public class TestProgressTracker {
+
+ private FileSystem fs;
+ private Configuration conf = new Configuration();
+
+ @Rule
+ public TemporaryFolder tempFolder = new TemporaryFolder();
+ public File baseFolder;
+
+ @Before
+ public void setUp() throws Exception {
+ fs = FileSystem.getLocal(conf);
+ }
+
+ @Test
+ public void testBasic() throws Exception {
+ ProgressTracker tracker = new ProgressTracker();
+ baseFolder = tempFolder.newFolder("trackertest");
+
+ Path file = new Path( baseFolder.toString() + Path.SEPARATOR + "testHeadTrimming.txt" );
+ createTextFile(file, 10);
+
+ // create reader and do some checks
+ TextFileReader reader = new TextFileReader(fs, file, null);
+ FileOffset pos0 = tracker.getCommitPosition();
+ Assert.assertNull(pos0);
+
+ TextFileReader.Offset currOffset = reader.getFileOffset();
+ Assert.assertNotNull(currOffset);
+ Assert.assertEquals(0, currOffset.byteOffset);
+
+ // read 1st line and ack
+ Assert.assertNotNull(reader.next());
+ TextFileReader.Offset pos1 = reader.getFileOffset();
+ tracker.recordAckedOffset(pos1);
+
+ TextFileReader.Offset pos1b = (TextFileReader.Offset) tracker.getCommitPosition();
+ Assert.assertEquals(pos1, pos1b);
+
+ // read 2nd line and ACK
+ Assert.assertNotNull(reader.next());
+ TextFileReader.Offset pos2 = reader.getFileOffset();
+ tracker.recordAckedOffset(pos2);
+
+ tracker.dumpState(System.err);
+ TextFileReader.Offset pos2b = (TextFileReader.Offset) tracker.getCommitPosition();
+ Assert.assertEquals(pos2, pos2b);
+
+
+ // read lines 3..7, don't ACK .. commit pos should remain same
+ Assert.assertNotNull(reader.next());//3
+ TextFileReader.Offset pos3 = reader.getFileOffset();
+ Assert.assertNotNull(reader.next());//4
+ TextFileReader.Offset pos4 = reader.getFileOffset();
+ Assert.assertNotNull(reader.next());//5
+ TextFileReader.Offset pos5 = reader.getFileOffset();
+ Assert.assertNotNull(reader.next());//6
+ TextFileReader.Offset pos6 = reader.getFileOffset();
+ Assert.assertNotNull(reader.next());//7
+ TextFileReader.Offset pos7 = reader.getFileOffset();
+
+ // now ack msg 5 and check
+ tracker.recordAckedOffset(pos5);
+ Assert.assertEquals(pos2, tracker.getCommitPosition()); // should remain unchanged @ 2
+ tracker.recordAckedOffset(pos4);
+ Assert.assertEquals(pos2, tracker.getCommitPosition()); // should remain unchanged @ 2
+ tracker.recordAckedOffset(pos3);
+ Assert.assertEquals(pos5, tracker.getCommitPosition()); // should be at 5
+
+ tracker.recordAckedOffset(pos6);
+ Assert.assertEquals(pos6, tracker.getCommitPosition()); // should be at 6
+ tracker.recordAckedOffset(pos6); // double ack on same msg
+ Assert.assertEquals(pos6, tracker.getCommitPosition()); // should still be at 6
+
+ tracker.recordAckedOffset(pos7);
+ Assert.assertEquals(pos7, tracker.getCommitPosition()); // should be at 7
+
+ tracker.dumpState(System.err);
+ }
+
+
+
+ private void createTextFile(Path file, int lineCount) throws IOException {
+ FSDataOutputStream os = fs.create(file);
+ for (int i = 0; i < lineCount; i++) {
+ os.writeBytes("line " + i + System.lineSeparator());
+ }
+ os.close();
+ }
+
+}