You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by cu...@apache.org on 2014/12/18 00:36:23 UTC
svn commit: r1646365 - in /avro/trunk: CHANGES.txt lang/java/mapred/pom.xml
lang/java/mapred/src/main/java/org/apache/avro/mapred/FsInput.java
lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestFsInput.java
Author: cutting
Date: Wed Dec 17 23:36:22 2014
New Revision: 1646365
URL: http://svn.apache.org/r1646365
Log:
AVRO-1539. Java: Add FileSystem based FsInput constructor. Contributed by Allan Shoup.
Added:
avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestFsInput.java (with props)
Modified:
avro/trunk/CHANGES.txt
avro/trunk/lang/java/mapred/pom.xml
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/FsInput.java
Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1646365&r1=1646364&r2=1646365&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Wed Dec 17 23:36:22 2014
@@ -56,6 +56,9 @@ Trunk (not yet released)
AVRO-1616. Add IntelliJ files to .gitignore. (Niels Basjes via cutting)
+ AVRO-1539. Java: Add FileSystem based FsInput constructor.
+ (Allan Shoup via cutting)
+
BUG FIXES
AVRO-1553. Java: MapReduce never uses MapOutputValueSchema (tomwhite)
Modified: avro/trunk/lang/java/mapred/pom.xml
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/pom.xml?rev=1646365&r1=1646364&r2=1646365&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/pom.xml (original)
+++ avro/trunk/lang/java/mapred/pom.xml Wed Dec 17 23:36:22 2014
@@ -121,6 +121,13 @@
<classifier>tests</classifier>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>avro</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.easymock</groupId>
<artifactId>easymock</artifactId>
Modified: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/FsInput.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/FsInput.java?rev=1646365&r1=1646364&r2=1646365&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/FsInput.java (original)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/FsInput.java Wed Dec 17 23:36:22 2014
@@ -23,6 +23,7 @@ import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.avro.file.SeekableInput;
@@ -34,26 +35,36 @@ public class FsInput implements Closeabl
/** Construct given a path and a configuration. */
public FsInput(Path path, Configuration conf) throws IOException {
- this.len = path.getFileSystem(conf).getFileStatus(path).getLen();
- this.stream = path.getFileSystem(conf).open(path);
+ this(path, path.getFileSystem(conf));
}
+ /** Construct given a path and a {@code FileSystem}. */
+ public FsInput(Path path, FileSystem fileSystem) throws IOException {
+ this.len = fileSystem.getFileStatus(path).getLen();
+ this.stream = fileSystem.open(path);
+ }
+
+ @Override
public long length() {
return len;
}
+ @Override
public int read(byte[] b, int off, int len) throws IOException {
return stream.read(b, off, len);
}
+ @Override
public void seek(long p) throws IOException {
stream.seek(p);
}
+ @Override
public long tell() throws IOException {
return stream.getPos();
}
+ @Override
public void close() throws IOException {
stream.close();
}
Added: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestFsInput.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestFsInput.java?rev=1646365&view=auto
==============================================================================
--- avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestFsInput.java (added)
+++ avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestFsInput.java Wed Dec 17 23:36:22 2014
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.apache.avro.mapreduce;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.nio.charset.Charset;
+
+import org.apache.avro.AvroTestUtil;
+import org.apache.avro.mapred.FsInput;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestFsInput {
+ private static File file;
+ private static final String FILE_CONTENTS = "abcdefghijklmnopqrstuvwxyz";
+ private Configuration conf;
+ private FsInput fsInput;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ File directory = AvroTestUtil.tempDirectory(TestFsInput.class, "file");
+ file = new File(directory, "file.txt");
+ PrintWriter out = new PrintWriter(new OutputStreamWriter(new FileOutputStream(file), Charset.forName("UTF-8")));
+ try {
+ out.print(FILE_CONTENTS);
+ } finally {
+ out.close();
+ }
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ conf = new Configuration();
+ conf.set("fs.default.name", "file:///");
+ fsInput = new FsInput(new Path(file.getPath()), conf);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (fsInput != null) {
+ fsInput.close();
+ }
+ }
+
+ @Test
+ public void testConfigurationConstructor() throws Exception {
+ FsInput in = new FsInput(new Path(file.getPath()), conf);
+ try {
+ int expectedByteCount = 1;
+ byte[] readBytes = new byte[expectedByteCount];
+ int actualByteCount = fsInput.read(readBytes, 0, expectedByteCount);
+ assertThat(actualByteCount, is(equalTo(expectedByteCount)));
+ } finally {
+ in.close();
+ }
+ }
+
+ @Test
+ public void testFileSystemConstructor() throws Exception {
+ Path path = new Path(file.getPath());
+ FileSystem fs = path.getFileSystem(conf);
+ FsInput in = new FsInput(path, fs);
+ try {
+ int expectedByteCount = 1;
+ byte[] readBytes = new byte[expectedByteCount];
+ int actualByteCount = fsInput.read(readBytes, 0, expectedByteCount);
+ assertThat(actualByteCount, is(equalTo(expectedByteCount)));
+ } finally {
+ in.close();
+ }
+ }
+
+ @Test
+ public void testLength() throws IOException {
+ assertEquals(fsInput.length(), FILE_CONTENTS.length());
+ }
+
+ @Test
+ public void testRead() throws Exception {
+ byte[] expectedBytes = FILE_CONTENTS.getBytes(Charset.forName("UTF-8"));
+ byte[] actualBytes = new byte[expectedBytes.length];
+ int actualByteCount = fsInput.read(actualBytes, 0, actualBytes.length);
+
+ assertThat(actualBytes, is(equalTo(expectedBytes)));
+ assertThat(actualByteCount, is(equalTo(expectedBytes.length)));
+ }
+
+ @Test
+ public void testSeek() throws Exception {
+ int seekPos = FILE_CONTENTS.length() / 2;
+ byte[] fileContentBytes = FILE_CONTENTS.getBytes(Charset.forName("UTF-8"));
+ byte expectedByte = fileContentBytes[seekPos];
+ fsInput.seek(seekPos);
+ byte[] readBytes = new byte[1];
+ fsInput.read(readBytes, 0, 1);
+ byte actualByte = readBytes[0];
+ assertThat(actualByte, is(equalTo(expectedByte)));
+ }
+
+ @Test
+ public void testTell() throws Exception {
+ long expectedTellPos = FILE_CONTENTS.length() / 2;
+ fsInput.seek(expectedTellPos);
+ long actualTellPos = fsInput.tell();
+ assertThat(actualTellPos, is(equalTo(expectedTellPos)));
+ }
+
+}
Propchange: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestFsInput.java
------------------------------------------------------------------------------
svn:eol-style = native