You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dh...@apache.org on 2010/07/01 10:37:30 UTC
svn commit: r959555 - in /hadoop/common/branches/branch-0.20-append: ./
src/core/org/apache/hadoop/io/ src/core/org/apache/hadoop/util/
src/hdfs/org/apache/hadoop/hdfs/server/datanode/
src/hdfs/org/apache/hadoop/hdfs/server/namenode/ src/test/org/apach...
Author: dhruba
Date: Thu Jul 1 08:37:30 2010
New Revision: 959555
URL: http://svn.apache.org/viewvc?rev=959555&view=rev
Log:
HDFS-1057. Concurrent readers hit ChecksumExceptions if following
a writer to very end of file (Sam Rash via dhruba)
Added:
hadoop/common/branches/branch-0.20-append/src/core/org/apache/hadoop/util/ChecksumUtil.java
hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/TestFileConcurrentReader.java
Modified:
hadoop/common/branches/branch-0.20-append/CHANGES.txt
hadoop/common/branches/branch-0.20-append/src/core/org/apache/hadoop/io/IOUtils.java
hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
Modified: hadoop/common/branches/branch-0.20-append/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/CHANGES.txt?rev=959555&r1=959554&r2=959555&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-append/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20-append/CHANGES.txt Thu Jul 1 08:37:30 2010
@@ -19,6 +19,9 @@ Release 0.20-append - Unreleased
HDFS-457. Better handling of volume failure in DataNode Storage.
(Nicolas Spiegelberg via dhruba)
+ HDFS-1057. Concurrent readers hit ChecksumExceptions if following
+ a writer to very end of file (Sam Rash via dhruba)
+
IMPROVEMENTS
BUG FIXES
Modified: hadoop/common/branches/branch-0.20-append/src/core/org/apache/hadoop/io/IOUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/core/org/apache/hadoop/io/IOUtils.java?rev=959555&r1=959554&r2=959555&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-append/src/core/org/apache/hadoop/io/IOUtils.java (original)
+++ hadoop/common/branches/branch-0.20-append/src/core/org/apache/hadoop/io/IOUtils.java Thu Jul 1 08:37:30 2010
@@ -20,6 +20,8 @@ package org.apache.hadoop.io;
import java.io.*;
import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
import org.apache.commons.logging.Log;
@@ -105,6 +107,28 @@ public class IOUtils {
off += ret;
}
}
+
+ /** Reads len bytes in a loop using the channel of the stream
+ * @param fileChannel a FileChannel to read len bytes into buf
+ * @param buf The buffer to fill
+ * @param off offset from the buffer
+ * @param len the length of bytes to read
+ * @throws IOException if it could not read requested number of bytes
+ * for any reason (including EOF)
+ */
+ public static void readFileChannelFully( FileChannel fileChannel, byte buf[],
+ int off, int len ) throws IOException {
+ int toRead = len;
+ ByteBuffer byteBuffer = ByteBuffer.wrap(buf, off, len);
+ while ( toRead > 0 ) {
+ int ret = fileChannel.read(byteBuffer);
+ if ( ret < 0 ) {
+ throw new IOException( "Premeture EOF from inputStream");
+ }
+ toRead -= ret;
+ off += ret;
+ }
+ }
/** Similar to readFully(). Skips bytes in a loop.
* @param in The InputStream to skip bytes from
Added: hadoop/common/branches/branch-0.20-append/src/core/org/apache/hadoop/util/ChecksumUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/core/org/apache/hadoop/util/ChecksumUtil.java?rev=959555&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-append/src/core/org/apache/hadoop/util/ChecksumUtil.java (added)
+++ hadoop/common/branches/branch-0.20-append/src/core/org/apache/hadoop/util/ChecksumUtil.java Thu Jul 1 08:37:30 2010
@@ -0,0 +1,40 @@
+package org.apache.hadoop.util;
+
+import java.io.IOException;
+
+public class ChecksumUtil {
+ /**
+ * updates the checksum for a buffer
+ *
+ * @param buf - buffer to update the checksum in
+ * @param chunkOff - offset in the buffer where the checksum is to update
+ * @param dataOff - offset in the buffer of the data
+ * @param dataLen - length of data to compute checksum on
+ */
+ public static void updateChunkChecksum(
+ byte[] buf,
+ int checksumOff,
+ int dataOff,
+ int dataLen,
+ DataChecksum checksum
+ ) throws IOException {
+ int bytesPerChecksum = checksum.getBytesPerChecksum();
+ int checksumSize = checksum.getChecksumSize();
+ int curChecksumOff = checksumOff;
+ int curDataOff = dataOff;
+ int numChunks = (dataLen + bytesPerChecksum - 1) / bytesPerChecksum;
+ int dataLeft = dataLen;
+
+ for (int i = 0; i < numChunks; i++) {
+ int len = Math.min(dataLeft, bytesPerChecksum);
+
+ checksum.reset();
+ checksum.update(buf, curDataOff, len);
+ checksum.writeValue(buf, curChecksumOff, false);
+
+ curDataOff += len;
+ curChecksumOff += checksumSize;
+ dataLeft -= len;
+ }
+ }
+}
Modified: hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=959555&r1=959554&r2=959555&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Thu Jul 1 08:37:30 2010
@@ -480,7 +480,10 @@ class BlockReceiver implements java.io.C
/// flush entire packet before sending ack
flush();
-
+
+ // update length only after flush to disk
+ datanode.data.setVisibleLength(block, offsetInBlock);
+
// put in queue for pending acks
if (responder != null) {
((PacketResponder)responder.getRunnable()).enqueue(seqno,
Modified: hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockSender.java?rev=959555&r1=959554&r2=959555&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockSender.java (original)
+++ hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockSender.java Thu Jul 1 08:37:30 2010
@@ -35,6 +35,7 @@ import org.apache.hadoop.hdfs.protocol.B
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.SocketOutputStream;
+import org.apache.hadoop.util.ChecksumUtil;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.StringUtils;
@@ -64,6 +65,7 @@ class BlockSender implements java.io.Clo
private boolean verifyChecksum; //if true, check is verified while reading
private BlockTransferThrottler throttler;
private final String clientTraceFmt; // format of client trace log message
+ private final MemoizedBlock memoizedBlock;
/**
* Minimum buffer used while sending data to clients. Used only if
@@ -89,7 +91,7 @@ class BlockSender implements java.io.Clo
this.chunkOffsetOK = chunkOffsetOK;
this.corruptChecksumOk = corruptChecksumOk;
this.verifyChecksum = verifyChecksum;
- this.blockLength = datanode.data.getLength(block);
+ this.blockLength = datanode.data.getVisibleLength(block);
this.transferToAllowed = datanode.transferToAllowed;
this.clientTraceFmt = clientTraceFmt;
@@ -164,6 +166,7 @@ class BlockSender implements java.io.Clo
seqno = 0;
blockIn = datanode.data.getBlockInputStream(block, offset); // seek to offset
+ memoizedBlock = new MemoizedBlock(blockIn, blockLength, datanode.data, block);
} catch (IOException ioe) {
IOUtils.closeStream(this);
IOUtils.closeStream(blockIn);
@@ -234,6 +237,15 @@ class BlockSender implements java.io.Clo
int len = Math.min((int) (endOffset - offset),
bytesPerChecksum*maxChunks);
+
+ // truncate len so that any partial chunks will be sent as a final packet.
+ // this is not necessary for correctness, but partial chunks are
+ // ones that may be recomputed and sent via buffer copy, so try to minimize
+ // those bytes
+ if (len > bytesPerChecksum && len % bytesPerChecksum != 0) {
+ len -= len % bytesPerChecksum;
+ }
+
if (len == 0) {
return 0;
}
@@ -298,32 +310,54 @@ class BlockSender implements java.io.Clo
cOff += checksumSize;
}
}
- //writing is done below (mainly to handle IOException)
- }
-
- try {
- if (blockInPosition >= 0) {
+
+ // only recompute checksum if we can't trust the meta data due to
+ // concurrent writes
+ if (memoizedBlock.hasBlockChanged(len)) {
+ ChecksumUtil.updateChunkChecksum(
+ buf, checksumOff, dataOff, len, checksum
+ );
+ }
+
+ try {
+ out.write(buf, 0, dataOff + len);
+ } catch (IOException e) {
+ throw ioeToSocketException(e);
+ }
+ } else {
+ try {
//use transferTo(). Checks on out and blockIn are already done.
+ SocketOutputStream sockOut = (SocketOutputStream) out;
+ FileChannel fileChannel = ((FileInputStream) blockIn).getChannel();
- SocketOutputStream sockOut = (SocketOutputStream)out;
- //first write the packet
- sockOut.write(buf, 0, dataOff);
- // no need to flush. since we know out is not a buffered stream.
-
- sockOut.transferToFully(((FileInputStream)blockIn).getChannel(),
- blockInPosition, len);
+ if (memoizedBlock.hasBlockChanged(len)) {
+ fileChannel.position(blockInPosition);
+ IOUtils.readFileChannelFully(
+ fileChannel,
+ buf,
+ dataOff,
+ len
+ );
+
+ ChecksumUtil.updateChunkChecksum(
+ buf, checksumOff, dataOff, len, checksum
+ );
+ sockOut.write(buf, 0, dataOff + len);
+ } else {
+ //first write the packet
+ sockOut.write(buf, 0, dataOff);
+ // no need to flush. since we know out is not a buffered stream.
+ sockOut.transferToFully(fileChannel, blockInPosition, len);
+ }
blockInPosition += len;
- } else {
- // normal transfer
- out.write(buf, 0, dataOff + len);
- }
-
- } catch (IOException e) {
+
+ } catch (IOException e) {
/* exception while writing to the client (well, with transferTo(),
* it could also be while reading from the local file).
*/
- throw ioeToSocketException(e);
+ throw ioeToSocketException(e);
+ }
}
if (throttler != null) { // rebalancing so throttle
@@ -382,12 +416,13 @@ class BlockSender implements java.io.Clo
streamForSendChunks = baseStream;
// assure a mininum buffer size.
- maxChunksPerPacket = (Math.max(BUFFER_SIZE,
+ maxChunksPerPacket = (Math.max(BUFFER_SIZE,
MIN_BUFFER_WITH_TRANSFERTO)
+ bytesPerChecksum - 1)/bytesPerChecksum;
- // allocate smaller buffer while using transferTo().
- pktSize += checksumSize * maxChunksPerPacket;
+ // packet buffer has to be able to do a normal transfer in the case
+ // of recomputing checksum
+ pktSize += (bytesPerChecksum + checksumSize) * maxChunksPerPacket;
} else {
maxChunksPerPacket = Math.max(1,
(BUFFER_SIZE + bytesPerChecksum - 1)/bytesPerChecksum);
@@ -410,7 +445,13 @@ class BlockSender implements java.io.Clo
} catch (IOException e) { //socket error
throw ioeToSocketException(e);
}
- } finally {
+ }
+ catch (RuntimeException e) {
+ LOG.error("unexpected exception sending block", e);
+
+ throw new IOException("unexpected runtime exception", e);
+ }
+ finally {
if (clientTraceFmt != null) {
ClientTraceLog.info(String.format(clientTraceFmt, totalRead));
}
@@ -425,4 +466,49 @@ class BlockSender implements java.io.Clo
boolean isBlockReadFully() {
return blockReadFully;
}
+
+ /**
+ * helper class used to track if a block's meta data is verifiable or not
+ */
+ private class MemoizedBlock {
+ // block data stream
+ private InputStream inputStream;
+ // visible block length
+ private long blockLength;
+ private final FSDatasetInterface fsDataset;
+ private final Block block;
+
+ private MemoizedBlock(
+ InputStream inputStream,
+ long blockLength,
+ FSDatasetInterface fsDataset, Block block) {
+ this.inputStream = inputStream;
+ this.blockLength = blockLength;
+ this.fsDataset = fsDataset;
+ this.block = block;
+ }
+
+ // logic: if we are starting or ending on a partial chunk and the block
+ // has more data than we were told at construction, the block has 'changed'
+ // in a way that we care about (ie, we can't trust crc data)
+ boolean hasBlockChanged(long dataLen) throws IOException {
+ // check if we are using transferTo since we tell if the file has changed
+ // (blockInPosition >= 0 => we are using transferTo and File Channels
+ if (BlockSender.this.blockInPosition >= 0) {
+ long currentLength = ((FileInputStream) inputStream).getChannel().size();
+
+ return (blockInPosition % bytesPerChecksum != 0 ||
+ dataLen % bytesPerChecksum != 0) &&
+ currentLength > blockLength;
+
+ } else {
+ long currentLength = fsDataset.getLength(block);
+
+ // offset is the offset into the block
+ return (BlockSender.this.offset % bytesPerChecksum != 0 ||
+ dataLen % bytesPerChecksum != 0) &&
+ currentLength > blockLength;
+ }
+ }
+ }
}
Modified: hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=959555&r1=959554&r2=959555&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original)
+++ hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Thu Jul 1 08:37:30 2010
@@ -709,14 +709,15 @@ public class FSDataset implements FSCons
//Find better place?
public static final String METADATA_EXTENSION = ".meta";
public static final short METADATA_VERSION = 1;
-
+
static class ActiveFile {
final File file;
final List<Thread> threads = new ArrayList<Thread>(2);
+ private volatile long visibleLength;
ActiveFile(File f, List<Thread> list) {
- file = f;
+ this(f);
if (list != null) {
threads.addAll(list);
}
@@ -726,13 +727,22 @@ public class FSDataset implements FSCons
// no active threads associated with this ActiveFile
ActiveFile(File f) {
file = f;
+ visibleLength = f.length();
}
-
+
+ public long getVisibleLength() {
+ return visibleLength;
+ }
+
+ public void setVisibleLength(long value) {
+ visibleLength = value;
+ }
+
public String toString() {
return getClass().getSimpleName() + "(file=" + file
+ ", threads=" + threads + ")";
}
- }
+ }
static String getMetaFileName(String blockFileName, long genStamp) {
return blockFileName + "_" + genStamp + METADATA_EXTENSION;
@@ -782,7 +792,7 @@ public class FSDataset implements FSCons
}
/** Return the block file for the given ID */
- public File findBlockFile(long blockId) {
+ public synchronized File findBlockFile(long blockId) {
final Block b = new Block(blockId);
File blockfile = null;
ActiveFile activefile = ongoingCreates.get(b);
@@ -808,7 +818,8 @@ public class FSDataset implements FSCons
return null;
}
File metafile = findMetaFile(blockfile);
- return new Block(blkid, blockfile.length(),
+ Block block = new Block(blkid);
+ return new Block(blkid, getVisibleLength(block),
parseGenerationStamp(blockfile, metafile));
}
@@ -883,6 +894,31 @@ public class FSDataset implements FSCons
return getBlockFile(b).length();
}
+ @Override
+ public synchronized long getVisibleLength(Block b) throws IOException {
+ ActiveFile activeFile = ongoingCreates.get(b);
+
+ if (activeFile != null) {
+ return activeFile.getVisibleLength();
+ } else {
+ return getLength(b);
+ }
+ }
+
+ @Override
+ public synchronized void setVisibleLength(Block b, long length)
+ throws IOException {
+ ActiveFile activeFile = ongoingCreates.get(b);
+
+ if (activeFile != null) {
+ activeFile.setVisibleLength(length);
+ } else {
+ throw new IOException(
+ String.format("block %s is not being written to", b)
+ );
+ }
+ }
+
/**
* Get File name for a given block.
*/
Modified: hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java?rev=959555&r1=959554&r2=959555&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java (original)
+++ hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java Thu Jul 1 08:37:30 2010
@@ -40,7 +40,7 @@ import org.apache.hadoop.util.DiskChecke
*
*/
public interface FSDatasetInterface extends FSDatasetMBean {
-
+
/**
* Returns the length of the metadata file of the specified block
@@ -49,7 +49,7 @@ public interface FSDatasetInterface exte
* @throws IOException
*/
public long getMetaDataLength(Block b) throws IOException;
-
+
/**
* This class provides the input stream and length of the metadata
* of a block
@@ -94,6 +94,25 @@ public interface FSDatasetInterface exte
public long getLength(Block b) throws IOException;
/**
+ * Returns the specified block's visible length (has metadata for this)
+ * @param b
+ * @return the specified block's visible length
+ * @throws IOException
+ */
+ public long getVisibleLength(Block b) throws IOException;
+
+ /**
+ * update the specified blocks visible meta data. NOTE: only applies
+ * to blocks that are being written to. If called on closed blocks,
+ * throws IOException
+ *
+ * @param b block to update the length for
+ * @param length value to set visible length to
+ * @throws IOException if the block is not in ongoingCreates
+ */
+ public void setVisibleLength(Block b, long length) throws IOException;
+
+ /**
* @return the generation stamp stored with the block.
*/
public Block getStoredBlock(long blkid) throws IOException;
Modified: hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=959555&r1=959554&r2=959555&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Thu Jul 1 08:37:30 2010
@@ -831,18 +831,28 @@ public class FSNamesystem implements FSC
blocks[curBlk] + "blockMap has " + numCorruptNodes +
" but corrupt replicas map has " + numCorruptReplicas);
}
- boolean blockCorrupt = (numCorruptNodes == numNodes);
- int numMachineSet = blockCorrupt ? numNodes :
+ DatanodeDescriptor[] machineSet = null;
+ boolean blockCorrupt = false;
+ if (inode.isUnderConstruction() && curBlk == blocks.length - 1
+ && blocksMap.numNodes(blocks[curBlk]) == 0) {
+ // get unfinished block locations
+ INodeFileUnderConstruction cons = (INodeFileUnderConstruction)inode;
+ machineSet = cons.getTargets();
+ blockCorrupt = false;
+ } else {
+ blockCorrupt = (numCorruptNodes == numNodes);
+ int numMachineSet = blockCorrupt ? numNodes :
(numNodes - numCorruptNodes);
- DatanodeDescriptor[] machineSet = new DatanodeDescriptor[numMachineSet];
- if (numMachineSet > 0) {
- numNodes = 0;
- for(Iterator<DatanodeDescriptor> it =
- blocksMap.nodeIterator(blocks[curBlk]); it.hasNext();) {
- DatanodeDescriptor dn = it.next();
- boolean replicaCorrupt = corruptReplicas.isReplicaCorrupt(blocks[curBlk], dn);
- if (blockCorrupt || (!blockCorrupt && !replicaCorrupt))
- machineSet[numNodes++] = dn;
+ machineSet = new DatanodeDescriptor[numMachineSet];
+ if (numMachineSet > 0) {
+ numNodes = 0;
+ for(Iterator<DatanodeDescriptor> it =
+ blocksMap.nodeIterator(blocks[curBlk]); it.hasNext();) {
+ DatanodeDescriptor dn = it.next();
+ boolean replicaCorrupt = corruptReplicas.isReplicaCorrupt(blocks[curBlk], dn);
+ if (blockCorrupt || (!blockCorrupt && !replicaCorrupt))
+ machineSet[numNodes++] = dn;
+ }
}
}
results.add(new LocatedBlock(blocks[curBlk], machineSet, curPos,
Added: hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/TestFileConcurrentReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/TestFileConcurrentReader.java?rev=959555&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/TestFileConcurrentReader.java (added)
+++ hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/TestFileConcurrentReader.java Thu Jul 1 08:37:30 2010
@@ -0,0 +1,402 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.ChecksumException;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.junit.Before;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.atomic.*;
+
+
+/**
+ * This class tests the cases of a concurrent reads/writes to a file;
+ * ie, one writer and one or more readers can see unfinsihed blocks
+ */
+public class TestFileConcurrentReader extends junit.framework.TestCase {
+
+ private enum SyncType
+ {
+ SYNC,
+ APPEND,
+ }
+
+
+ private static final Logger LOG =
+ Logger.getLogger(TestFileConcurrentReader.class);
+
+ {
+ ((Log4JLogger) LeaseManager.LOG).getLogger().setLevel(Level.ALL);
+ ((Log4JLogger) FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
+ ((Log4JLogger) DFSClient.LOG).getLogger().setLevel(Level.ALL);
+ }
+
+ static final long seed = 0xDEADBEEFL;
+ static final int blockSize = 8192;
+ private static final int DEFAULT_WRITE_SIZE = 1024 + 1;
+ private static final int SMALL_WRITE_SIZE = 61;
+ private Configuration conf;
+ private MiniDFSCluster cluster;
+ private FileSystem fileSystem;
+
+ // creates a file but does not close it
+ private FSDataOutputStream createFile(FileSystem fileSys, Path name, int repl)
+ throws IOException {
+ System.out.println("createFile: Created " + name + " with " + repl + " replica.");
+ FSDataOutputStream stm = fileSys.create(name, true,
+ fileSys.getConf().getInt("io.file.buffer.size", 4096),
+ (short) repl, (long) blockSize);
+ return stm;
+ }
+
+ @Before
+ protected void setUp() throws IOException {
+ conf = new Configuration();
+ init(conf);
+ }
+
+ private void init(Configuration conf) throws IOException {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ cluster = new MiniDFSCluster(conf, 1, true, null);
+ cluster.waitClusterUp();
+ fileSystem = cluster.getFileSystem();
+ }
+
+ private void writeFileAndSync(FSDataOutputStream stm, int size)
+ throws IOException {
+// byte[] buffer = AppendTestUtil.randomBytes(seed, size);
+ byte[] buffer = generateSequentialBytes(0, size);
+ stm.write(buffer, 0, size);
+ stm.sync();
+ }
+
+ private void checkCanRead(FileSystem fileSys, Path path, int numBytes)
+ throws IOException {
+ waitForBlocks(fileSys, path);
+ assertBytesAvailable(fileSys, path, numBytes);
+ }
+
+ // make sure bytes are available and match expected
+ private void assertBytesAvailable(
+ FileSystem fileSystem,
+ Path path,
+ int numBytes
+ ) throws IOException {
+ byte[] buffer = new byte[numBytes];
+ FSDataInputStream inputStream = fileSystem.open(path);
+ IOUtils.readFully(inputStream, buffer, 0, numBytes);
+
+ assertTrue(
+ "unable to validate bytes",
+ validateSequentialBytes(buffer, 0, numBytes)
+ );
+ }
+
+ private void waitForBlocks(FileSystem fileSys, Path name)
+ throws IOException {
+ // wait until we have at least one block in the file to read.
+ boolean done = false;
+
+ while (!done) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ }
+ done = true;
+ BlockLocation[] locations = fileSys.getFileBlockLocations(
+ fileSys.getFileStatus(name), 0, blockSize);
+ if (locations.length < 1) {
+ done = false;
+ continue;
+ }
+ }
+ }
+
+ /**
+ * Test that that writes to an incomplete block are available to a reader
+ */
+ public void testUnfinishedBlockRead()
+ throws IOException {
+ try {
+ // check that / exists
+ Path path = new Path("/");
+ System.out.println("Path : \"" + path.toString() + "\"");
+ System.out.println(fileSystem.getFileStatus(path).isDir());
+ assertTrue("/ should be a directory",
+ fileSystem.getFileStatus(path).isDir());
+
+ // create a new file in the root, write data, do no close
+ Path file1 = new Path("/unfinished-block");
+ FSDataOutputStream stm = TestFileCreation.createFile(fileSystem, file1, 1);
+
+ // write partial block and sync
+ int partialBlockSize = blockSize / 2;
+ writeFileAndSync(stm, partialBlockSize);
+
+ // Make sure a client can read it before it is closed
+ checkCanRead(fileSystem, file1, partialBlockSize);
+
+ stm.close();
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
+ /**
+ * test case: if the BlockSender decides there is only one packet to send,
+ * the previous computation of the pktSize based on transferToAllowed
+ * would result in too small a buffer to do the buffer-copy needed
+ * for partial chunks.
+ */
+ public void testUnfinishedBlockPacketBufferOverrun() throws IOException {
+ try {
+ // check that / exists
+ Path path = new Path("/");
+ System.out.println("Path : \"" + path.toString() + "\"");
+ System.out.println(fileSystem.getFileStatus(path).isDir());
+ assertTrue("/ should be a directory",
+ fileSystem.getFileStatus(path).isDir());
+
+ // create a new file in the root, write data, do no close
+ Path file1 = new Path("/unfinished-block");
+ final FSDataOutputStream stm =
+ TestFileCreation.createFile(fileSystem, file1, 1);
+
+ // write partial block and sync
+ final int bytesPerChecksum = conf.getInt("io.bytes.per.checksum", 512);
+ final int partialBlockSize = bytesPerChecksum - 1;
+
+ writeFileAndSync(stm, partialBlockSize);
+
+ // Make sure a client can read it before it is closed
+ checkCanRead(fileSystem, file1, partialBlockSize);
+
+ stm.close();
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
+ // for some reason, using tranferTo evokes the race condition more often
+ // so test separately
+ public void testUnfinishedBlockCRCErrorTransferTo() throws IOException {
+ runTestUnfinishedBlockCRCError(true, SyncType.SYNC, DEFAULT_WRITE_SIZE);
+ }
+
+ public void testUnfinishedBlockCRCErrorTransferToVerySmallWrite()
+ throws IOException {
+ runTestUnfinishedBlockCRCError(true, SyncType.SYNC, SMALL_WRITE_SIZE);
+ }
+
+ // fails due to issue w/append, disable
+ public void _testUnfinishedBlockCRCErrorTransferToAppend() throws IOException {
+ runTestUnfinishedBlockCRCError(true, SyncType.APPEND, DEFAULT_WRITE_SIZE);
+ }
+
+ public void testUnfinishedBlockCRCErrorNormalTransfer() throws IOException {
+ runTestUnfinishedBlockCRCError(false, SyncType.SYNC, DEFAULT_WRITE_SIZE);
+ }
+
+ public void testUnfinishedBlockCRCErrorNormalTransferVerySmallWrite()
+ throws IOException {
+ runTestUnfinishedBlockCRCError(false, SyncType.SYNC, SMALL_WRITE_SIZE);
+ }
+
+ // fails due to issue w/append, disable
+ public void _testUnfinishedBlockCRCErrorNormalTransferAppend()
+ throws IOException {
+ runTestUnfinishedBlockCRCError(false, SyncType.APPEND, DEFAULT_WRITE_SIZE);
+ }
+
+ private void runTestUnfinishedBlockCRCError(
+ final boolean transferToAllowed, SyncType syncType, int writeSize
+ ) throws IOException {
+ runTestUnfinishedBlockCRCError(
+ transferToAllowed, syncType, writeSize, new Configuration()
+ );
+ }
+
+ private void runTestUnfinishedBlockCRCError(
+ final boolean transferToAllowed,
+ final SyncType syncType,
+ final int writeSize,
+ Configuration conf
+ ) throws IOException {
+ try {
+ conf.setBoolean("dfs.support.append", syncType == SyncType.APPEND);
+ conf.setBoolean("dfs.datanode.transferTo.allowed", transferToAllowed);
+ init(conf);
+
+ final Path file = new Path("/block-being-written-to");
+ final int numWrites = 2000;
+ final AtomicBoolean writerDone = new AtomicBoolean(false);
+ final AtomicBoolean writerStarted = new AtomicBoolean(false);
+ final AtomicBoolean error = new AtomicBoolean(false);
+ final FSDataOutputStream initialOutputStream = fileSystem.create(file);
+ Thread writer = new Thread(new Runnable() {
+ private FSDataOutputStream outputStream = initialOutputStream;
+
+ @Override
+ public void run() {
+ try {
+ for (int i = 0; !error.get() && i < numWrites; i++) {
+ try {
+ final byte[] writeBuf =
+ generateSequentialBytes(i * writeSize, writeSize);
+ outputStream.write(writeBuf);
+ if (syncType == SyncType.SYNC) {
+ outputStream.sync();
+ } else { // append
+ outputStream.close();
+ outputStream = fileSystem.append(file);
+ }
+ writerStarted.set(true);
+ } catch (IOException e) {
+ error.set(true);
+ LOG.error(String.format("error writing to file"));
+ }
+ }
+
+ outputStream.close();
+ writerDone.set(true);
+ } catch (Exception e) {
+ LOG.error("error in writer", e);
+
+ throw new RuntimeException(e);
+ }
+ }
+ });
+ Thread tailer = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ long startPos = 0;
+ while (!writerDone.get() && !error.get()) {
+ if (writerStarted.get()) {
+ try {
+ startPos = tailFile(file, startPos);
+ } catch (IOException e) {
+ LOG.error(String.format("error tailing file %s", file), e);
+
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ } catch (RuntimeException e) {
+ if (e.getCause() instanceof ChecksumException) {
+ error.set(true);
+ }
+
+ LOG.error("error in tailer", e);
+ throw e;
+ }
+ }
+ });
+
+ writer.start();
+ tailer.start();
+
+ try {
+ writer.join();
+ tailer.join();
+
+ assertFalse(
+ "error occurred, see log above", error.get()
+ );
+ } catch (InterruptedException e) {
+ LOG.info("interrupted waiting for writer or tailer to complete");
+
+ Thread.currentThread().interrupt();
+ }
+ initialOutputStream.close();
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
+ private byte[] generateSequentialBytes(int start, int length) {
+ byte[] result = new byte[length];
+
+ for (int i = 0; i < length; i++) {
+ result[i] = (byte)((start + i) % 127);
+ }
+
+ return result;
+ }
+
+ private boolean validateSequentialBytes(byte[] buf, int startPos, int len) {
+ for (int i = 0; i < len; i++) {
+ int expected = (i + startPos) % 127;
+
+ if (buf[i] % 127 != expected) {
+ LOG.error(String.format("at position [%d], got [%d] and expected [%d]",
+ startPos, buf[i], expected));
+
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ private long tailFile(Path file, long startPos) throws IOException {
+ long numRead = 0;
+ FSDataInputStream inputStream = fileSystem.open(file);
+ inputStream.seek(startPos);
+
+ int len = 4 * 1024;
+ byte[] buf = new byte[len];
+ int read;
+ while ((read = inputStream.read(buf)) > -1) {
+ LOG.info(String.format("read %d bytes", read));
+
+ if (!validateSequentialBytes(buf, (int) (startPos + numRead), read)) {
+ LOG.error(String.format("invalid bytes: [%s]\n", Arrays.toString(buf)));
+ throw new ChecksumException(
+ String.format("unable to validate bytes"),
+ startPos
+ );
+ }
+
+ numRead += read;
+ }
+
+ inputStream.close();
+ return numRead + startPos - 1;
+ }
+
+ public void _testClientReadsPastBlockEnd() throws IOException {
+ // todo : client has length greater than server--should handle with
+ // exception, not with corrupt data!!!
+ }
+}
\ No newline at end of file
Modified: hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java?rev=959555&r1=959554&r2=959555&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java (original)
+++ hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java Thu Jul 1 08:37:30 2010
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs.server.datanode;
+import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -66,6 +67,7 @@ public class SimulatedFSDataset impleme
Configuration conf = null;
static byte[] nullCrcFileData;
+
{
DataChecksum checksum = DataChecksum.newDataChecksum( DataChecksum.
CHECKSUM_NULL, 16*1024 );
@@ -325,6 +327,16 @@ public class SimulatedFSDataset impleme
return binfo.getlength();
}
+ @Override
+ public long getVisibleLength(Block b) throws IOException {
+ return getLength(b);
+ }
+
+ @Override
+ public void setVisibleLength(Block b, long length) throws IOException {
+ //no-op
+ }
+
/** {@inheritDoc} */
public Block getStoredBlock(long blkid) throws IOException {
Block b = new Block(blkid);