You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2016/01/15 17:46:09 UTC

[08/24] storm git commit: Functionally complete. Not well tested. Have some UTs

http://git-wip-us.apache.org/repos/asf/storm/blob/60e7a812/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
new file mode 100644
index 0000000..9200c90
--- /dev/null
+++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestHdfsSpout.java
@@ -0,0 +1,465 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hdfs.spout;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.storm.hdfs.common.HdfsUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.junit.Before;
+import org.junit.After;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import sun.reflect.generics.reflectiveObjects.NotImplementedException;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.storm.hdfs.common.HdfsUtils.Pair;
+
+
+public class TestHdfsSpout {
+
+  @Rule
+  public TemporaryFolder tempFolder = new TemporaryFolder();
+  public File baseFolder;
+  private Path source;
+  private Path archive;
+  private Path badfiles;
+
+
+  public TestHdfsSpout() {
+  }
+
+  static MiniDFSCluster.Builder builder;
+  static MiniDFSCluster hdfsCluster;
+  static FileSystem fs;
+  static String hdfsURI;
+  static Configuration conf = new Configuration();
+
+  @BeforeClass
+  public static void setupClass() throws IOException {
+    builder = new MiniDFSCluster.Builder(new Configuration());
+    hdfsCluster = builder.build();
+    fs  = hdfsCluster.getFileSystem();
+    hdfsURI = "hdfs://localhost:" + hdfsCluster.getNameNodePort() + "/";
+  }
+
+  @AfterClass
+  public static void teardownClass() throws IOException {
+    fs.close();
+    hdfsCluster.shutdown();
+  }
+
+
+  @Before
+  public void setup() throws Exception {
+    baseFolder = tempFolder.newFolder("hdfsspout");
+    source = new Path(baseFolder.toString() + "/source");
+    fs.mkdirs(source);
+    archive = new Path(baseFolder.toString() + "/archive");
+    fs.mkdirs(archive);
+    badfiles = new Path(baseFolder.toString() + "/bad");
+    fs.mkdirs(badfiles);
+
+  }
+
+  @After
+  public void shutDown() throws IOException {
+    fs.delete(new Path(baseFolder.toString()),true);
+  }
+
+  @Test
+  public void testSimpleText() throws IOException {
+    Path file1 = new Path(source.toString() + "/file1.txt");
+    createTextFile(file1, 5);
+
+    Path file2 = new Path(source.toString() + "/file2.txt");
+    createTextFile(file2, 5);
+
+    listDir(source);
+
+    Map conf = getDefaultConfig();
+    conf.put(Configs.COMMIT_FREQ_COUNT, "1");
+    conf.put(Configs.COMMIT_FREQ_SEC, "1");
+    HdfsSpout spout = makeSpout(0, conf, Configs.TEXT);
+
+    List<String> res = runSpout(spout,"r11", "a0", "a1", "a2", "a3", "a4");
+    for (String re : res) {
+      System.err.println(re);
+    }
+
+    listCompletedDir();
+    Path arc1 = new Path(archive.toString() + "/file1.txt");
+    Path arc2 = new Path(archive.toString() + "/file2.txt");
+    checkCollectorOutput_txt((MockCollector) spout.getCollector(), arc1, arc2);
+  }
+
+
+  private void checkCollectorOutput_txt(MockCollector collector, Path... txtFiles) throws IOException {
+    ArrayList<String> expected = new ArrayList<>();
+    for (Path txtFile : txtFiles) {
+      List<String> lines= getTextFileContents(fs, txtFile);
+      expected.addAll(lines);
+    }
+
+    List<String> actual = new ArrayList<>();
+    for (Pair<HdfsSpout.MessageId, List<Object>> item : collector.items) {
+      actual.add(item.getValue().get(0).toString());
+    }
+    Assert.assertEquals(expected, actual);
+  }
+
+  private List<String> getTextFileContents(FileSystem fs, Path txtFile) throws IOException {
+    ArrayList<String> result = new ArrayList<>();
+    FSDataInputStream istream = fs.open(txtFile);
+    InputStreamReader isreader = new InputStreamReader(istream,"UTF-8");
+    BufferedReader reader = new BufferedReader(isreader);
+
+    for( String line = reader.readLine(); line!=null; line = reader.readLine() ) {
+      result.add(line);
+    }
+    isreader.close();
+    return result;
+  }
+
+  private void checkCollectorOutput_seq(MockCollector collector, Path... seqFiles) throws IOException {
+    ArrayList<String> expected = new ArrayList<>();
+    for (Path seqFile : seqFiles) {
+      List<String> lines= getSeqFileContents(fs, seqFile);
+      expected.addAll(lines);
+    }
+    Assert.assertTrue(expected.equals(collector.lines));
+  }
+
+  private List<String> getSeqFileContents(FileSystem fs, Path... seqFiles) throws IOException {
+    ArrayList<String> result = new ArrayList<>();
+
+    for (Path seqFile : seqFiles) {
+      FSDataInputStream istream = fs.open(seqFile);
+      try {
+        SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(seqFile));
+        Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+        Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
+        while (reader.next(key, value) ) {
+          String keyValStr = Arrays.asList(key,value).toString();
+          result.add(keyValStr);
+        }
+      } finally {
+        istream.close();
+      }
+    }// for
+    return result;
+  }
+
+  private void listCompletedDir() throws IOException {
+    listDir(source);
+    listDir(archive);
+  }
+
+  private List<String> listBadDir() throws IOException {
+    return listDir(badfiles);
+  }
+
+  private List<String> listDir(Path p) throws IOException {
+    ArrayList<String> result = new ArrayList<>();
+    System.err.println("*** Listing " + p);
+    RemoteIterator<LocatedFileStatus> fileNames =  fs.listFiles(p, false);
+    while ( fileNames.hasNext() ) {
+      LocatedFileStatus fileStatus = fileNames.next();
+      System.err.println(fileStatus.getPath());
+      result.add(fileStatus.getPath().toString());
+    }
+    return result;
+  }
+
+
+  @Test
+  public void testSimpleSequenceFile() throws IOException {
+
+    source = new Path("/tmp/hdfsspout/source");
+    fs.mkdirs(source);
+    archive = new Path("/tmp/hdfsspout/archive");
+    fs.mkdirs(archive);
+
+    Path file1 = new Path(source + "/file1.seq");
+    createSeqFile(fs, file1);
+
+    Path file2 = new Path(source + "/file2.seq");
+    createSeqFile(fs, file2);
+
+    Map conf = getDefaultConfig();
+    HdfsSpout spout = makeSpout(0, conf, Configs.SEQ);
+
+    List<String> res = runSpout(spout, "r11", "a0", "a1", "a2", "a3", "a4");
+    for (String re : res) {
+      System.err.println(re);
+    }
+
+    listDir(source);
+
+
+    Path f1 = new Path(archive + "/file1.seq");
+    Path f2 = new Path(archive + "/file2.seq");
+
+    checkCollectorOutput_seq((MockCollector) spout.getCollector(), f1, f2);
+  }
+
+// - TODO: this test needs the spout to fail with an exception
+  @Test
+  public void testFailure() throws Exception {
+
+    Path file1 = new Path(source.toString() + "/file1.txt");
+    createTextFile(file1, 5);
+
+    listDir(source);
+
+    Map conf = getDefaultConfig();
+//    conf.put(HdfsSpout.Configs.BACKOFF_SEC, "2");
+    HdfsSpout spout = makeSpout(0, conf, MockTextFailingReader.class.getName());
+    List<String> res = runSpout(spout, "r3");
+    for (String re : res) {
+      System.err.println(re);
+    }
+
+    listCompletedDir();
+    List<String> badFiles = listBadDir();
+    Assert.assertEquals( badFiles.size(), 1);
+    Assert.assertEquals(((MockCollector) spout.getCollector()).lines.size(), 1);
+  }
+
+  // @Test
+  public void testLocking() throws Exception {
+    Path file1 = new Path(source.toString() + "/file1.txt");
+    createTextFile(file1, 5);
+
+    listDir(source);
+
+    Map conf = getDefaultConfig();
+    conf.put(Configs.COMMIT_FREQ_COUNT, "1");
+    conf.put(Configs.COMMIT_FREQ_SEC, "1");
+    HdfsSpout spout = makeSpout(0, conf, Configs.TEXT);
+    List<String> res = runSpout(spout,"r4");
+    for (String re : res) {
+      System.err.println(re);
+    }
+    List<String> lockFiles = listDir(spout.getLockDirPath());
+    Assert.assertEquals(1, lockFiles.size());
+    runSpout(spout, "r3");
+    List<String> lines = readTextFile(fs, lockFiles.get(0));
+    System.err.println(lines);
+    Assert.assertEquals(6, lines.size());
+  }
+
+  private static List<String> readTextFile(FileSystem fs, String f) throws IOException {
+    Path file = new Path(f);
+    FSDataInputStream x = fs.open(file);
+    BufferedReader reader = new BufferedReader(new InputStreamReader(x));
+    String line = null;
+    ArrayList<String> result = new ArrayList<>();
+    while( (line = reader.readLine()) !=null )
+      result.add( line );
+    return result;
+  }
+
+  private Map getDefaultConfig() {
+    Map conf = new HashMap();
+    conf.put(Configs.SOURCE_DIR, source.toString());
+    conf.put(Configs.ARCHIVE_DIR, archive.toString());
+    conf.put(Configs.BAD_DIR, badfiles.toString());
+    conf.put("filesystem", fs);
+    return conf;
+  }
+
+
+  private static HdfsSpout makeSpout(int spoutId, Map conf, String readerType) {
+    HdfsSpout spout = new HdfsSpout();
+    MockCollector collector = new MockCollector();
+    conf.put(Configs.READER_TYPE, readerType);
+    spout.open(conf, new MockTopologyContext(spoutId), collector);
+    return spout;
+  }
+
+  /**
+   * Execute a sequence of calls to EventHubSpout.
+   *
+   * @param cmds: set of commands to run,
+   * e.g. "r,r,r,r,a1,f2,...". The commands are:
+   * r[N] -  receive() called N times
+   * aN - ack, item number: N
+   * fN - fail, item number: N
+   */
+
+  private List<String> runSpout(HdfsSpout spout,  String...  cmds) {
+    MockCollector collector = (MockCollector) spout.getCollector();
+      for(String cmd : cmds) {
+        if(cmd.startsWith("r")) {
+          int count = 1;
+          if(cmd.length() > 1) {
+            count = Integer.parseInt(cmd.substring(1));
+          }
+          for(int i=0; i<count; ++i) {
+            spout.nextTuple();
+          }
+        }
+        else if(cmd.startsWith("a")) {
+          int n = Integer.parseInt(cmd.substring(1));
+          Pair<HdfsSpout.MessageId, List<Object>> item = collector.items.get(n);
+          spout.ack(item.getKey());
+        }
+        else if(cmd.startsWith("f")) {
+          int n = Integer.parseInt(cmd.substring(1));
+          Pair<HdfsSpout.MessageId, List<Object>> item = collector.items.get(n);
+          spout.fail(item.getKey());
+        }
+      }
+      return collector.lines;
+    }
+
+  private void createTextFile(Path file, int lineCount) throws IOException {
+    FSDataOutputStream os = fs.create(file);
+    for (int i = 0; i < lineCount; i++) {
+      os.writeBytes("line " + i + System.lineSeparator());
+    }
+    os.close();
+  }
+
+
+
+  private static void createSeqFile(FileSystem fs, Path file) throws IOException {
+
+    Configuration conf = new Configuration();
+    try {
+      if(fs.exists(file)) {
+        fs.delete(file, false);
+      }
+
+      SequenceFile.Writer w = SequenceFile.createWriter(fs, conf, file, IntWritable.class, Text.class );
+      for (int i = 0; i < 5; i++) {
+        w.append(new IntWritable(i), new Text("line " + i));
+      }
+      w.close();
+      System.out.println("done");
+    } catch (IOException e) {
+      e.printStackTrace();
+
+    }
+  }
+
+
+
+  static class MockCollector extends SpoutOutputCollector {
+    //comma separated offsets
+    public ArrayList<String> lines;
+    public ArrayList<Pair<HdfsSpout.MessageId, List<Object> > > items;
+
+    public MockCollector() {
+      super(null);
+      lines = new ArrayList<>();
+      items = new ArrayList<>();
+    }
+
+
+
+    @Override
+    public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) {
+//      HdfsSpout.MessageId id = (HdfsSpout.MessageId) messageId;
+//      lines.add(id.toString() + ' ' + tuple.toString());
+      lines.add(tuple.toString());
+      items.add(HdfsUtils.Pair.of(messageId, tuple));
+      return null;
+    }
+
+    @Override
+    public void emitDirect(int arg0, String arg1, List<Object> arg2, Object arg3) {
+      throw new NotImplementedException();
+    }
+
+    @Override
+    public void reportError(Throwable arg0) {
+      throw new NotImplementedException();
+    }
+
+    @Override
+    public long getPendingCount() {
+      return 0;
+    }
+  } // class MockCollector
+
+
+
+  // Throws exceptions for 2nd and 3rd line read attempt
+  static class MockTextFailingReader extends TextFileReader {
+    int readAttempts = 0;
+
+    public MockTextFailingReader(FileSystem fs, Path file, Map conf) throws IOException {
+      super(fs, file, conf);
+    }
+
+    @Override
+    public List<Object> next() throws IOException, ParseException {
+      readAttempts++;
+      if (readAttempts == 2) {
+        throw new IOException("mock test exception");
+      } else if (readAttempts >= 3) {
+        throw new ParseException("mock test exception", null);
+      }
+      return super.next();
+    }
+  }
+
+  static class MockTopologyContext extends TopologyContext {
+    private final int componentId;
+
+    public MockTopologyContext(int componentId) {
+      // StormTopology topology, Map stormConf, Map<Integer, String> taskToComponent, Map<String, List<Integer>> componentToSortedTasks, Map<String, Map<String, Fields>> componentToStreamToFields, String stormId, String codeDir, String pidDir, Integer taskId, Integer workerPort, List<Integer> workerTasks, Map<String, Object> defaultResources, Map<String, Object> userResources, Map<String, Object> executorData, Map<Integer, Map<Integer, Map<String, IMetric>>> registeredMetrics, Atom openOrPrepareWasCalled
+      super(null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null);
+      this.componentId = componentId;
+    }
+
+    public String getThisComponentId() {
+      return Integer.toString( componentId );
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/60e7a812/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestProgressTracker.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestProgressTracker.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestProgressTracker.java
new file mode 100644
index 0000000..1a00674
--- /dev/null
+++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/spout/TestProgressTracker.java
@@ -0,0 +1,108 @@
+package org.apache.storm.hdfs.spout;
+
+
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+
+public class TestProgressTracker {
+
+  private FileSystem fs;
+  private Configuration conf = new Configuration();
+
+  @Rule
+  public TemporaryFolder tempFolder = new TemporaryFolder();
+  public File baseFolder;
+
+  @Before
+  public void setUp() throws Exception {
+    fs = FileSystem.getLocal(conf);
+  }
+
+  @Test
+  public void testBasic() throws Exception {
+    ProgressTracker tracker = new ProgressTracker();
+    baseFolder = tempFolder.newFolder("trackertest");
+
+    Path file = new Path( baseFolder.toString() + Path.SEPARATOR + "testHeadTrimming.txt" );
+    createTextFile(file, 10);
+
+    // create reader and do some checks
+    TextFileReader reader = new TextFileReader(fs, file, null);
+    FileOffset pos0 = tracker.getCommitPosition();
+    Assert.assertNull(pos0);
+
+    TextFileReader.Offset currOffset = reader.getFileOffset();
+    Assert.assertNotNull(currOffset);
+    Assert.assertEquals(0, currOffset.byteOffset);
+
+    // read 1st line and ack
+    Assert.assertNotNull(reader.next());
+    TextFileReader.Offset pos1 = reader.getFileOffset();
+    tracker.recordAckedOffset(pos1);
+
+    TextFileReader.Offset pos1b = (TextFileReader.Offset) tracker.getCommitPosition();
+    Assert.assertEquals(pos1, pos1b);
+
+    // read 2nd line and ACK
+    Assert.assertNotNull(reader.next());
+    TextFileReader.Offset pos2 = reader.getFileOffset();
+    tracker.recordAckedOffset(pos2);
+
+    tracker.dumpState(System.err);
+    TextFileReader.Offset pos2b = (TextFileReader.Offset) tracker.getCommitPosition();
+    Assert.assertEquals(pos2, pos2b);
+
+
+    // read lines 3..7, don't ACK .. commit pos should remain same
+    Assert.assertNotNull(reader.next());//3
+    TextFileReader.Offset pos3 = reader.getFileOffset();
+    Assert.assertNotNull(reader.next());//4
+    TextFileReader.Offset pos4 = reader.getFileOffset();
+    Assert.assertNotNull(reader.next());//5
+    TextFileReader.Offset pos5 = reader.getFileOffset();
+    Assert.assertNotNull(reader.next());//6
+    TextFileReader.Offset pos6 = reader.getFileOffset();
+    Assert.assertNotNull(reader.next());//7
+    TextFileReader.Offset pos7 = reader.getFileOffset();
+
+    // now ack msg 5 and check
+    tracker.recordAckedOffset(pos5);
+    Assert.assertEquals(pos2, tracker.getCommitPosition()); // should remain unchanged @ 2
+    tracker.recordAckedOffset(pos4);
+    Assert.assertEquals(pos2, tracker.getCommitPosition()); // should remain unchanged @ 2
+    tracker.recordAckedOffset(pos3);
+    Assert.assertEquals(pos5, tracker.getCommitPosition()); // should be at 5
+
+    tracker.recordAckedOffset(pos6);
+    Assert.assertEquals(pos6, tracker.getCommitPosition()); // should be at 6
+    tracker.recordAckedOffset(pos6);                        // double ack on same msg
+    Assert.assertEquals(pos6, tracker.getCommitPosition()); // should still be at 6
+
+    tracker.recordAckedOffset(pos7);
+    Assert.assertEquals(pos7, tracker.getCommitPosition()); // should be at 7
+
+    tracker.dumpState(System.err);
+  }
+
+
+
+  private void createTextFile(Path file, int lineCount) throws IOException {
+    FSDataOutputStream os = fs.create(file);
+    for (int i = 0; i < lineCount; i++) {
+      os.writeBytes("line " + i + System.lineSeparator());
+    }
+    os.close();
+  }
+
+}