You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by cu...@apache.org on 2014/12/18 00:36:23 UTC

svn commit: r1646365 - in /avro/trunk: CHANGES.txt lang/java/mapred/pom.xml lang/java/mapred/src/main/java/org/apache/avro/mapred/FsInput.java lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestFsInput.java

Author: cutting
Date: Wed Dec 17 23:36:22 2014
New Revision: 1646365

URL: http://svn.apache.org/r1646365
Log:
AVRO-1539. Java: Add FileSystem based FsInput constructor.  Contributed by Allan Shoup.

Added:
    avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestFsInput.java   (with props)
Modified:
    avro/trunk/CHANGES.txt
    avro/trunk/lang/java/mapred/pom.xml
    avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/FsInput.java

Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1646365&r1=1646364&r2=1646365&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Wed Dec 17 23:36:22 2014
@@ -56,6 +56,9 @@ Trunk (not yet released)
 
     AVRO-1616. Add IntelliJ files to .gitignore. (Niels Basjes via cutting)
 
+    AVRO-1539. Java: Add FileSystem based FsInput constructor.
+    (Allan Shoup via cutting)
+
   BUG FIXES
 
     AVRO-1553. Java: MapReduce never uses MapOutputValueSchema (tomwhite)

Modified: avro/trunk/lang/java/mapred/pom.xml
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/pom.xml?rev=1646365&r1=1646364&r2=1646365&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/pom.xml (original)
+++ avro/trunk/lang/java/mapred/pom.xml Wed Dec 17 23:36:22 2014
@@ -121,6 +121,13 @@
       <classifier>tests</classifier>
       <version>${project.version}</version>
     </dependency>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>avro</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
     <dependency>     
       <groupId>org.easymock</groupId>
       <artifactId>easymock</artifactId>

Modified: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/FsInput.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/FsInput.java?rev=1646365&r1=1646364&r2=1646365&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/FsInput.java (original)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/FsInput.java Wed Dec 17 23:36:22 2014
@@ -23,6 +23,7 @@ import java.io.IOException;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FSDataInputStream;
 
 import org.apache.avro.file.SeekableInput;
@@ -34,26 +35,36 @@ public class FsInput implements Closeabl
 
   /** Construct given a path and a configuration. */
   public FsInput(Path path, Configuration conf) throws IOException {
-    this.len = path.getFileSystem(conf).getFileStatus(path).getLen();
-    this.stream = path.getFileSystem(conf).open(path);
+    this(path, path.getFileSystem(conf));
   }
 
+  /** Construct given a path and a {@code FileSystem}. */
+  public FsInput(Path path, FileSystem fileSystem) throws IOException {
+    this.len = fileSystem.getFileStatus(path).getLen();
+    this.stream = fileSystem.open(path);
+  }
+
+  @Override
   public long length() {
     return len;
   }
 
+  @Override
   public int read(byte[] b, int off, int len) throws IOException {
     return stream.read(b, off, len);
   }
 
+  @Override
   public void seek(long p) throws IOException {
     stream.seek(p);
   }
 
+  @Override
   public long tell() throws IOException {
     return stream.getPos();
   }
 
+  @Override
   public void close() throws IOException {
     stream.close();
   }

Added: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestFsInput.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestFsInput.java?rev=1646365&view=auto
==============================================================================
--- avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestFsInput.java (added)
+++ avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestFsInput.java Wed Dec 17 23:36:22 2014
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.avro.mapreduce;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.nio.charset.Charset;
+
+import org.apache.avro.AvroTestUtil;
+import org.apache.avro.mapred.FsInput;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestFsInput {
+  private static File file;
+  private static final String FILE_CONTENTS = "abcdefghijklmnopqrstuvwxyz";
+  private Configuration conf;
+  private FsInput fsInput;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    File directory = AvroTestUtil.tempDirectory(TestFsInput.class, "file");
+    file = new File(directory, "file.txt");
+    PrintWriter out = new PrintWriter(new OutputStreamWriter(new FileOutputStream(file), Charset.forName("UTF-8")));
+    try {
+      out.print(FILE_CONTENTS);
+    } finally {
+      out.close();
+    }
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    conf = new Configuration();
+    conf.set("fs.default.name", "file:///");
+    fsInput = new FsInput(new Path(file.getPath()), conf);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (fsInput != null) {
+      fsInput.close();
+    }
+  }
+
+  @Test
+  public void testConfigurationConstructor() throws Exception {
+    FsInput in = new FsInput(new Path(file.getPath()), conf);
+    try {
+      int expectedByteCount = 1;
+      byte[] readBytes = new byte[expectedByteCount];
+      int actualByteCount = fsInput.read(readBytes, 0, expectedByteCount);
+      assertThat(actualByteCount, is(equalTo(expectedByteCount)));
+    } finally {
+      in.close();
+    }
+  }
+
+  @Test
+  public void testFileSystemConstructor() throws Exception {
+    Path path = new Path(file.getPath());
+    FileSystem fs = path.getFileSystem(conf);
+    FsInput in = new FsInput(path, fs);
+    try {
+      int expectedByteCount = 1;
+      byte[] readBytes = new byte[expectedByteCount];
+      int actualByteCount = fsInput.read(readBytes, 0, expectedByteCount);
+      assertThat(actualByteCount, is(equalTo(expectedByteCount)));
+    } finally {
+      in.close();
+    }
+  }
+
+  @Test
+  public void testLength() throws IOException {
+    assertEquals(fsInput.length(), FILE_CONTENTS.length());
+  }
+
+  @Test
+  public void testRead() throws Exception {
+    byte[] expectedBytes = FILE_CONTENTS.getBytes(Charset.forName("UTF-8"));
+    byte[] actualBytes = new byte[expectedBytes.length];
+    int actualByteCount = fsInput.read(actualBytes, 0, actualBytes.length);
+
+    assertThat(actualBytes, is(equalTo(expectedBytes)));
+    assertThat(actualByteCount, is(equalTo(expectedBytes.length)));
+  }
+
+  @Test
+  public void testSeek() throws Exception {
+    int seekPos = FILE_CONTENTS.length() / 2;
+    byte[] fileContentBytes = FILE_CONTENTS.getBytes(Charset.forName("UTF-8"));
+    byte expectedByte = fileContentBytes[seekPos];
+    fsInput.seek(seekPos);
+    byte[] readBytes = new byte[1];
+    fsInput.read(readBytes, 0, 1);
+    byte actualByte = readBytes[0];
+    assertThat(actualByte, is(equalTo(expectedByte)));
+  }
+
+  @Test
+  public void testTell() throws Exception {
+    long expectedTellPos = FILE_CONTENTS.length() / 2;
+    fsInput.seek(expectedTellPos);
+    long actualTellPos = fsInput.tell();
+    assertThat(actualTellPos, is(equalTo(expectedTellPos)));
+  }
+
+}

Propchange: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestFsInput.java
------------------------------------------------------------------------------
    svn:eol-style = native