You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/10/03 02:18:00 UTC
svn commit: r452282 - in /lucene/hadoop/trunk: ./ src/c++/libhdfs/
src/java/org/apache/hadoop/dfs/ src/java/org/apache/hadoop/fs/
src/test/org/apache/hadoop/dfs/
Author: cutting
Date: Mon Oct 2 17:17:59 2006
New Revision: 452282
URL: http://svn.apache.org/viewvc?view=rev&rev=452282
Log:
HADOOP-519. Add positioned read methods to FSInputStream. Contributed by Milind.
Added:
lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/PositionedReadable.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestPread.java
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/src/c++/libhdfs/hdfs.c
lucene/hadoop/trunk/src/c++/libhdfs/hdfs.h
lucene/hadoop/trunk/src/c++/libhdfs/hdfs_test.c
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataInputStream.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSInputStream.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=452282&r1=452281&r2=452282
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Mon Oct 2 17:17:59 2006
@@ -106,6 +106,12 @@
26. HADOOP-566. Fix scripts to work correctly when accessed through
relative symbolic links. (Lee Faris via cutting)
+27. HADOOP-519. Add positioned read methods to FSInputStream. These
+ permit one to read from a stream without moving its position, and
+ can hence be performed by multiple threads at once on a single
+ stream. Implement an optimized version for DFS and local FS.
+ (Milind Bhandarkar via cutting)
+
Release 0.6.2 - 2006-09-18
Modified: lucene/hadoop/trunk/src/c++/libhdfs/hdfs.c
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/c%2B%2B/libhdfs/hdfs.c?view=diff&rev=452282&r1=452281&r2=452282
==============================================================================
--- lucene/hadoop/trunk/src/c++/libhdfs/hdfs.c (original)
+++ lucene/hadoop/trunk/src/c++/libhdfs/hdfs.c Mon Oct 2 17:17:59 2006
@@ -559,6 +559,58 @@
return noReadBytes;
}
+tSize hdfsPread(hdfsFS fs, hdfsFile f, tOffset position, void* buffer, tSize length)
+{
+ // JAVA EQUIVALENT:
+ // byte [] bR = new byte[length];
+ // fis.read(pos, bR, 0, length);
+
+ //Get the JNIEnv* corresponding to current thread
+ JNIEnv* env = getJNIEnv();
+
+ //Parameters
+ jobject jFS = (jobject)fs;
+ jobject jInputStream = (jobject)(f ? f->file : NULL);
+
+ jthrowable jException;
+ jbyteArray jbRarray;
+ jint noReadBytes = 0;
+
+ //Sanity check
+ if (!f || f->type == UNINITIALIZED) {
+ errno = EBADF;
+ return -1;
+ }
+
+ //Error checking... make sure that this file is 'readable'
+ if (f->type != INPUT) {
+ fprintf(stderr, "Cannot read from a non-InputStream object!\n");
+ errno = EINVAL;
+ return -1;
+ }
+
+ //Read the requisite bytes
+ jbRarray = (*env)->NewByteArray(env, length);
+ if (invokeMethod(env, (RetVal*)&noReadBytes, &jException, INSTANCE,
+ jInputStream, "org/apache/hadoop/fs/FSDataInputStream",
+ "read", "(J[BII)I", position, jbRarray, 0, length) != 0) {
+ fprintf(stderr,
+ "Call to org.apache.hadoop.fs.FSDataInputStream::read failed!\n");
+ errno = EINTERNAL;
+ noReadBytes = -1;
+ } else {
+ if(noReadBytes > 0) {
+ (*env)->GetByteArrayRegion(env, jbRarray, 0, noReadBytes, buffer);
+ }
+ //This is a valid case: there aren't any bytes left to read!
+ errno = 0;
+ }
+ (*env)->ReleaseByteArrayElements(env, jbRarray,
+ (*env)->GetByteArrayElements(env, jbRarray, 0), JNI_ABORT);
+
+ return noReadBytes;
+}
+
tSize hdfsWrite(hdfsFS fs, hdfsFile f, const void* buffer, tSize length)
{
// JAVA EQUIVALENT
Modified: lucene/hadoop/trunk/src/c++/libhdfs/hdfs.h
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/c%2B%2B/libhdfs/hdfs.h?view=diff&rev=452282&r1=452281&r2=452282
==============================================================================
--- lucene/hadoop/trunk/src/c++/libhdfs/hdfs.h (original)
+++ lucene/hadoop/trunk/src/c++/libhdfs/hdfs.h Mon Oct 2 17:17:59 2006
@@ -142,6 +142,17 @@
tSize hdfsRead(hdfsFS fs, hdfsFile file, void* buffer, tSize length);
/**
+ * hdfsPread - Positional read of data from an open file.
+ * @param fs The configured filesystem handle.
+ * @param file The file handle.
+ * @param position Position from which to read
+ * @param buffer The buffer to copy read bytes into.
+ * @param length The length of the buffer.
+ * @return Returns the number of bytes actually read, possibly less than than length;-1 on error.
+ */
+ tSize hdfsPread(hdfsFS fs, hdfsFile file, tOffset position, void* buffer, tSize length);
+
+ /**
* hdfsWrite - Write data into an open file.
* @param fs The configured filesystem handle.
* @param file The file handle.
Modified: lucene/hadoop/trunk/src/c++/libhdfs/hdfs_test.c
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/c%2B%2B/libhdfs/hdfs_test.c?view=diff&rev=452282&r1=452281&r2=452282
==============================================================================
--- lucene/hadoop/trunk/src/c++/libhdfs/hdfs_test.c (original)
+++ lucene/hadoop/trunk/src/c++/libhdfs/hdfs_test.c Mon Oct 2 17:17:59 2006
@@ -97,6 +97,11 @@
fprintf(stderr, "Read following %d bytes:\n%s\n",
num_read_bytes, buffer);
+ num_read_bytes = hdfsPread(fs, readFile, 0, (void*)buffer,
+ sizeof(buffer));
+ fprintf(stderr, "Read following %d bytes:\n%s\n",
+ num_read_bytes, buffer);
+
hdfsCloseFile(fs, readFile);
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java?view=diff&rev=452282&r1=452281&r2=452282
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java Mon Oct 2 17:17:59 2006
@@ -464,6 +464,16 @@
}
}
+ /** Utility class to encapsulate data node info and its ip address. */
+ private static class DNAddrPair {
+ DatanodeInfo info;
+ InetSocketAddress addr;
+ DNAddrPair(DatanodeInfo info, InetSocketAddress addr) {
+ this.info = info;
+ this.addr = addr;
+ }
+ }
+
/****************************************************************
* DFSInputStream provides bytes from a named file. It handles
* negotiation of the namenode and various datanodes as necessary.
@@ -494,7 +504,7 @@
/**
* Grab the open-file info from namenode
*/
- void openInfo() throws IOException {
+ synchronized void openInfo() throws IOException {
Block oldBlocks[] = this.blocks;
LocatedBlock results[] = namenode.open(src);
@@ -560,33 +570,12 @@
// Connect to best DataNode for desired Block, with potential offset
//
int failures = 0;
- InetSocketAddress targetAddr = null;
TreeSet deadNodes = new TreeSet();
while (s == null) {
- DatanodeInfo chosenNode;
-
- try {
- chosenNode = bestNode(nodes[targetBlock], deadNodes);
- targetAddr = DataNode.createSocketAddr(chosenNode.getName());
- } catch (IOException ie) {
- String blockInfo =
- blocks[targetBlock]+" file="+src+" offset="+target;
- if (failures >= MAX_BLOCK_ACQUIRE_FAILURES) {
- throw new IOException("Could not obtain block: " + blockInfo);
- }
- if (nodes[targetBlock] == null || nodes[targetBlock].length == 0) {
- LOG.info("No node available for block: " + blockInfo);
- }
- LOG.info("Could not obtain block from any node: " + ie);
- try {
- Thread.sleep(3000);
- } catch (InterruptedException iex) {
- }
- deadNodes.clear();
- openInfo();
- failures++;
- continue;
- }
+ DNAddrPair retval = chooseDataNode(targetBlock, deadNodes);
+ DatanodeInfo chosenNode = retval.info;
+ InetSocketAddress targetAddr = retval.addr;
+
try {
s = new Socket();
s.connect(targetAddr, READ_TIMEOUT);
@@ -704,11 +693,142 @@
return -1;
}
+
+ private DNAddrPair chooseDataNode(int blockId, TreeSet deadNodes)
+ throws IOException {
+ int failures = 0;
+ while (true) {
+ try {
+ DatanodeInfo chosenNode = bestNode(nodes[blockId], deadNodes);
+ InetSocketAddress targetAddr = DataNode.createSocketAddr(chosenNode.getName());
+ return new DNAddrPair(chosenNode, targetAddr);
+ } catch (IOException ie) {
+ String blockInfo =
+ blocks[blockId]+" file="+src;
+ if (failures >= MAX_BLOCK_ACQUIRE_FAILURES) {
+ throw new IOException("Could not obtain block: " + blockInfo);
+ }
+ if (nodes[blockId] == null || nodes[blockId].length == 0) {
+ LOG.info("No node available for block: " + blockInfo);
+ }
+ LOG.info("Could not obtain block from any node: " + ie);
+ try {
+ Thread.sleep(3000);
+ } catch (InterruptedException iex) {
+ }
+ deadNodes.clear();
+ openInfo();
+ failures++;
+ continue;
+ }
+ }
+ }
+
+ private void fetchBlockByteRange(int blockId, long start,
+ long end, byte[] buf, int offset) throws IOException {
+ //
+ // Connect to best DataNode for desired Block, with potential offset
+ //
+ TreeSet deadNodes = new TreeSet();
+ Socket dn = null;
+ while (dn == null) {
+ DNAddrPair retval = chooseDataNode(blockId, deadNodes);
+ DatanodeInfo chosenNode = retval.info;
+ InetSocketAddress targetAddr = retval.addr;
+
+ try {
+ dn = new Socket();
+ dn.connect(targetAddr, READ_TIMEOUT);
+ dn.setSoTimeout(READ_TIMEOUT);
+
+ //
+ // Xmit header info to datanode
+ //
+ DataOutputStream out = new DataOutputStream(new BufferedOutputStream(dn.getOutputStream()));
+ out.write(OP_READ_RANGE_BLOCK);
+ blocks[blockId].write(out);
+ out.writeLong(start);
+ out.writeLong(end);
+ out.flush();
+
+ //
+ // Get bytes in block, set streams
+ //
+ DataInputStream in = new DataInputStream(new BufferedInputStream(dn.getInputStream()));
+ long curBlockSize = in.readLong();
+ long actualStart = in.readLong();
+ long actualEnd = in.readLong();
+ if (curBlockSize != blocks[blockId].len) {
+ throw new IOException("Recorded block size is " +
+ blocks[blockId].len + ", but datanode reports size of " +
+ curBlockSize);
+ }
+ if ((actualStart != start) || (actualEnd != end)) {
+ throw new IOException("Asked for byte range " + start +
+ "-" + end + ", but only received range " + actualStart +
+ "-" + actualEnd);
+ }
+ int nread = in.read(buf, offset, (int)(end - start + 1));
+ } catch (IOException ex) {
+ // Put chosen node into dead list, continue
+ LOG.info("Failed to connect to " + targetAddr + ":" + ex);
+ deadNodes.add(chosenNode);
+ if (dn != null) {
+ try {
+ dn.close();
+ } catch (IOException iex) {
+ }
+ }
+ dn = null;
+ }
+ }
+ }
+
+ public int read(long position, byte[] buf, int off, int len)
+ throws IOException {
+ // sanity checks
+ checkOpen();
+ if (closed) {
+ throw new IOException("Stream closed");
+ }
+ if ((position < 0) || (position > filelen)) {
+ return -1;
+ }
+ int realLen = len;
+ if ((position + len) > filelen) {
+ realLen = (int)(filelen - position);
+ }
+ // determine the block and byte range within the block
+ // corresponding to position and realLen
+ int targetBlock = -1;
+ long targetStart = 0;
+ long targetEnd = 0;
+ for (int idx = 0; idx < blocks.length; idx++) {
+ long blocklen = blocks[idx].getNumBytes();
+ targetEnd = targetStart + blocklen - 1;
+ if (position >= targetStart && position <= targetEnd) {
+ targetBlock = idx;
+ targetStart = position - targetStart;
+ targetEnd = Math.min(blocklen, targetStart + realLen) - 1;
+ realLen = (int)(targetEnd - targetStart + 1);
+ break;
+ }
+ targetStart += blocklen;
+ }
+ if (targetBlock < 0) {
+ throw new IOException(
+ "Impossible situation: could not find target position "+
+ position);
+ }
+ fetchBlockByteRange(targetBlock, targetStart, targetEnd, buf, off);
+ return realLen;
+ }
+
/**
* Seek to a new arbitrary location
*/
public synchronized void seek(long targetPos) throws IOException {
- if (targetPos >= filelen) {
+ if (targetPos > filelen) {
throw new IOException("Cannot seek after EOF");
}
pos = targetPos;
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java?view=diff&rev=452282&r1=452281&r2=452282
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java Mon Oct 2 17:17:59 2006
@@ -510,7 +510,8 @@
byte op = (byte) in.read();
if (op == OP_WRITE_BLOCK) {
writeBlock(in);
- } else if (op == OP_READ_BLOCK || op == OP_READSKIP_BLOCK) {
+ } else if (op == OP_READ_BLOCK || op == OP_READSKIP_BLOCK ||
+ op == OP_READ_RANGE_BLOCK) {
readBlock(in, op);
} else {
while (op >= 0) {
@@ -548,8 +549,12 @@
b.readFields(in);
long toSkip = 0;
+ long endOffset = -1;
if (op == OP_READSKIP_BLOCK) {
toSkip = in.readLong();
+ } else if (op == OP_READ_RANGE_BLOCK) {
+ toSkip = in.readLong();
+ endOffset = in.readLong();
}
//
@@ -567,14 +572,15 @@
// Get blockdata from disk
//
long len = data.getLength(b);
+ if (endOffset < 0) { endOffset = len; }
DataInputStream in2 = new DataInputStream(data.getBlockData(b));
out.writeLong(len);
- if (op == OP_READSKIP_BLOCK) {
+ long amtSkipped = 0;
+ if ((op == OP_READSKIP_BLOCK) || (op == OP_READ_RANGE_BLOCK)) {
if (toSkip > len) {
toSkip = len;
}
- long amtSkipped = 0;
try {
amtSkipped = in2.skip(toSkip);
} catch (IOException iex) {
@@ -583,26 +589,35 @@
}
out.writeLong(amtSkipped);
}
+ if (op == OP_READ_RANGE_BLOCK) {
+ if (endOffset > len) {
+ endOffset = len;
+ }
+ out.writeLong(endOffset);
+ }
byte buf[] = new byte[BUFFER_SIZE];
try {
+ int toRead = (int) (endOffset - amtSkipped + 1);
int bytesRead = 0;
try {
- bytesRead = in2.read(buf);
+ bytesRead = in2.read(buf, 0, Math.min(BUFFER_SIZE, toRead));
myMetrics.readBytes(bytesRead);
} catch (IOException iex) {
shutdown();
throw iex;
}
- while (bytesRead >= 0) {
+ while (toRead > 0 && bytesRead >= 0) {
out.write(buf, 0, bytesRead);
- len -= bytesRead;
+ toRead -= bytesRead;
+ if (toRead > 0) {
try {
- bytesRead = in2.read(buf);
+ bytesRead = in2.read(buf, 0, Math.min(BUFFER_SIZE, toRead));
myMetrics.readBytes(bytesRead);
} catch (IOException iex) {
shutdown();
throw iex;
+ }
}
}
} catch (SocketException se) {
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java?view=diff&rev=452282&r1=452281&r2=452282
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java Mon Oct 2 17:17:59 2006
@@ -86,6 +86,7 @@
public static final byte OP_WRITE_BLOCK = (byte) 80;
public static final byte OP_READ_BLOCK = (byte) 81;
public static final byte OP_READSKIP_BLOCK = (byte) 82;
+ public static final byte OP_READ_RANGE_BLOCK = (byte) 83;
// Encoding types
public static final byte RUNLENGTH_ENCODING = 0;
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataInputStream.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataInputStream.java?view=diff&rev=452282&r1=452281&r2=452282
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataInputStream.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSDataInputStream.java Mon Oct 2 17:17:59 2006
@@ -26,7 +26,8 @@
/** Utility that wraps a {@link FSInputStream} in a {@link DataInputStream}
* and buffers input through a {@link BufferedInputStream}. */
-public class FSDataInputStream extends DataInputStream {
+public class FSDataInputStream extends DataInputStream
+ implements Seekable, PositionedReadable {
private static final Log LOG =
LogFactory.getLog("org.apache.hadoop.fs.DataInputStream");
@@ -36,7 +37,8 @@
private int bytesPerSum = 1;
/** Verify that data matches checksums. */
- private class Checker extends FilterInputStream implements Seekable {
+ private class Checker extends FilterInputStream
+ implements Seekable, PositionedReadable {
private FileSystem fs;
private Path file;
private FSDataInputStream sums;
@@ -139,6 +141,21 @@
return ((FSInputStream)in).getPos();
}
+ public int read(long position, byte[] buffer, int offset, int length)
+ throws IOException {
+ return ((FSInputStream)in).read(position, buffer, offset, length);
+ }
+
+ public void readFully(long position, byte[] buffer, int offset, int length)
+ throws IOException {
+ ((FSInputStream)in).readFully(position, buffer, offset, length);
+ }
+
+ public void readFully(long position, byte[] buffer)
+ throws IOException {
+ ((FSInputStream)in).readFully(position, buffer);
+ }
+
public void close() throws IOException {
super.close();
stopSumming();
@@ -181,6 +198,16 @@
return position; // return cached position
}
+ public int read(long position, byte[] buffer, int offset, int length)
+ throws IOException {
+ return ((PositionedReadable)in).read(position, buffer, offset, length);
+ }
+
+ public void readFully(long position, byte[] buffer, int offset, int length)
+ throws IOException {
+ ((PositionedReadable)in).readFully(position, buffer, offset, length);
+ }
+
}
/** Buffer input. This improves performance significantly.*/
@@ -224,6 +251,15 @@
return buf[pos++] & 0xff;
}
+ public int read(long position, byte[] buffer, int offset, int length)
+ throws IOException {
+ return ((PositionCache)in).read(position, buffer, offset, length);
+ }
+
+ public void readFully(long position, byte[] buffer, int offset, int length)
+ throws IOException {
+ ((PositionCache)in).readFully(position, buffer, offset, length);
+ }
}
@@ -252,7 +288,7 @@
this.in = new Buffer(new PositionCache(in), bufferSize);
}
- public void seek(long desired) throws IOException {
+ public synchronized void seek(long desired) throws IOException {
((Buffer)in).seek(desired);
}
@@ -260,4 +296,18 @@
return ((Buffer)in).getPos();
}
+ public int read(long position, byte[] buffer, int offset, int length)
+ throws IOException {
+ return ((Buffer)in).read(position, buffer, offset, length);
+ }
+
+ public void readFully(long position, byte[] buffer, int offset, int length)
+ throws IOException {
+ ((Buffer)in).readFully(position, buffer, offset, length);
+ }
+
+ public void readFully(long position, byte[] buffer)
+ throws IOException {
+ ((Buffer)in).readFully(position, buffer, 0, buffer.length);
+ }
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSInputStream.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSInputStream.java?view=diff&rev=452282&r1=452281&r2=452282
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSInputStream.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/FSInputStream.java Mon Oct 2 17:17:59 2006
@@ -23,7 +23,8 @@
*
* @author Mike Cafarella
*****************************************************************/
-public abstract class FSInputStream extends InputStream implements Seekable {
+public abstract class FSInputStream extends InputStream
+ implements Seekable, PositionedReadable {
/**
* Seek to the given offset from the start of the file.
* The next read() will be from that location. Can't
@@ -35,4 +36,36 @@
* Return the current offset from the start of the file
*/
public abstract long getPos() throws IOException;
+
+ public int read(long position, byte[] buffer, int offset, int length)
+ throws IOException {
+ synchronized (this) {
+ long oldPos = getPos();
+ int nread = -1;
+ try {
+ seek(position);
+ nread = read(buffer, offset, length);
+ } finally {
+ seek(oldPos);
+ }
+ return nread;
+ }
+ }
+
+ public void readFully(long position, byte[] buffer, int offset, int length)
+ throws IOException {
+ int nread = 0;
+ while (nread < length) {
+ int nbytes = read(position+nread, buffer, offset+nread, length-nread);
+ if (nbytes < 0) {
+ throw new EOFException("End of file reached before reading fully.");
+ }
+ nread += nbytes;
+ }
+ }
+
+ public void readFully(long position, byte[] buffer)
+ throws IOException {
+ readFully(position, buffer, 0, buffer.length);
+ }
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java?view=diff&rev=452282&r1=452281&r2=452282
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/LocalFileSystem.java Mon Oct 2 17:17:59 2006
@@ -17,6 +17,7 @@
package org.apache.hadoop.fs;
import java.io.*;
+import java.nio.ByteBuffer;
import java.util.*;
import java.nio.channels.*;
import org.apache.hadoop.conf.Configuration;
@@ -113,6 +114,16 @@
}
}
+ public int read(long position, byte[] b, int off, int len)
+ throws IOException {
+ ByteBuffer bb = ByteBuffer.wrap(b, off, len);
+ try {
+ return fis.getChannel().read(bb, position);
+ } catch (IOException e) {
+ throw new FSError(e);
+ }
+ }
+
public long skip(long n) throws IOException { return fis.skip(n); }
}
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/PositionedReadable.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/PositionedReadable.java?view=auto&rev=452282
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/PositionedReadable.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/fs/PositionedReadable.java Mon Oct 2 17:17:59 2006
@@ -0,0 +1,45 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import java.io.*;
+import org.apache.hadoop.fs.*;
+
+/** Stream that permits positional reading. */
+public interface PositionedReadable {
+ /**
+ * Read upto the specified number of bytes, from a given
+ * position within a file, and return the number of bytes read. This does not
+ * change the current offset of a file, and is thread-safe.
+ */
+ public int read(long position, byte[] buffer, int offset, int length)
+ throws IOException;
+
+ /**
+ * Read the specified number of bytes, from a given
+ * position within a file. This does not
+ * change the current offset of a file, and is thread-safe.
+ */
+ public void readFully(long position, byte[] buffer, int offset, int length)
+ throws IOException;
+
+ /**
+ * Read number of bytes equalt to the length of the buffer, from a given
+ * position within a file. This does not
+ * change the current offset of a file, and is thread-safe.
+ */
+ public void readFully(long position, byte[] buffer) throws IOException;
+}
Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestPread.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestPread.java?view=auto&rev=452282
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestPread.java (added)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestPread.java Mon Oct 2 17:17:59 2006
@@ -0,0 +1,125 @@
+package org.apache.hadoop.dfs;
+
+import javax.swing.filechooser.FileSystemView;
+import junit.framework.TestCase;
+import java.io.*;
+import java.util.Random;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * This class tests the DFS positional read functionality in a single node
+ * mini-cluster.
+ * @author Milind Bhandarkar
+ */
+public class TestPread extends TestCase {
+ static final long seed = 0xDEADBEEFL;
+ static final int blockSize = 4096;
+
+ private void writeFile(FileSystem fileSys, Path name) throws IOException {
+ // create and write a file that contains three blocks of data
+ DataOutputStream stm = fileSys.create(name, true, 4096, (short)1,
+ (long)blockSize);
+ byte[] buffer = new byte[(int)(3*blockSize)];
+ Random rand = new Random(seed);
+ rand.nextBytes(buffer);
+ stm.write(buffer);
+ stm.close();
+ }
+
+ private void checkAndEraseData(byte[] actual, int from, byte[] expected, String message) {
+ for (int idx = 0; idx < actual.length; idx++) {
+ this.assertEquals(message+" byte "+(from+idx)+" differs. expected "+
+ expected[from+idx]+" actual "+actual[idx],
+ actual[idx], expected[from+idx]);
+ actual[idx] = 0;
+ }
+ }
+
+ private void doPread(FSDataInputStream stm, long position, byte[] buffer,
+ int offset, int length) throws IOException {
+ int nread = 0;
+ while (nread < length) {
+ int nbytes = stm.read(position+nread, buffer, offset+nread, length-nread);
+ assertTrue("Error in pread", nbytes > 0);
+ nread += nbytes;
+ }
+ }
+ private void pReadFile(FileSystem fileSys, Path name) throws IOException {
+ FSDataInputStream stm = fileSys.open(name);
+ byte[] expected = new byte[(int)(3*blockSize)];
+ Random rand = new Random(seed);
+ rand.nextBytes(expected);
+ // do a sanity check. Read first 4K bytes
+ byte[] actual = new byte[4096];
+ stm.readFully(actual);
+ checkAndEraseData(actual, 0, expected, "Read Sanity Test");
+ // now do a pread for the first 8K bytes
+ actual = new byte[8192];
+ doPread(stm, 0L, actual, 0, 8192);
+ checkAndEraseData(actual, 0, expected, "Pread Test 1");
+ // Now check to see if the normal read returns 4K-8K byte range
+ actual = new byte[4096];
+ stm.readFully(actual);
+ checkAndEraseData(actual, 4096, expected, "Pread Test 2");
+ // Now see if we can cross a single block boundary successfully
+ // read 4K bytes from blockSize - 2K offset
+ stm.readFully(blockSize - 2048, actual, 0, 4096);
+ checkAndEraseData(actual, (int)(blockSize-2048), expected, "Pread Test 3");
+ // now see if we can cross two block boundaries successfully
+ // read blockSize + 4K bytes from blockSize - 2K offset
+ actual = new byte[(int)(blockSize+4096)];
+ stm.readFully(blockSize - 2048, actual);
+ checkAndEraseData(actual, (int)(blockSize-2048), expected, "Pread Test 4");
+ // now check that even after all these preads, we can still read
+ // bytes 8K-12K
+ actual = new byte[4096];
+ stm.readFully(actual);
+ checkAndEraseData(actual, 8192, expected, "Pread Test 5");
+ // all done
+ stm.close();
+ }
+
+ private void cleanupFile(FileSystem fileSys, Path name) throws IOException {
+ assertTrue(fileSys.exists(name));
+ fileSys.delete(name);
+ assertTrue(!fileSys.exists(name));
+ }
+
+ /**
+ * Tests positional read in DFS.
+ */
+ public void testPreadDFS() throws IOException {
+ Configuration conf = new Configuration();
+ MiniDFSCluster cluster = new MiniDFSCluster(65312, conf, false);
+ FileSystem fileSys = cluster.getFileSystem();
+ try {
+ Path file1 = new Path("preadtest.dat");
+ writeFile(fileSys, file1);
+ pReadFile(fileSys, file1);
+ cleanupFile(fileSys, file1);
+ } finally {
+ fileSys.close();
+ cluster.shutdown();
+ }
+ }
+
+ /**
+ * Tests positional read in LocalFS.
+ */
+ public void testPreadLocalFS() throws IOException {
+ Configuration conf = new Configuration();
+ FileSystem fileSys = FileSystem.getNamed("local", conf);
+ try {
+ Path file1 = new Path("build/test/data", "preadtest.dat");
+ writeFile(fileSys, file1);
+ pReadFile(fileSys, file1);
+ cleanupFile(fileSys, file1);
+ } finally {
+ fileSys.close();
+ }
+ }
+}