You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by ha...@apache.org on 2010/01/22 08:53:32 UTC
svn commit: r902024 - in /hadoop/hdfs/trunk: CHANGES.txt
src/java/org/apache/hadoop/hdfs/DFSClient.java
src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java
Author: hairong
Date: Fri Jan 22 07:53:27 2010
New Revision: 902024
URL: http://svn.apache.org/viewvc?rev=902024&view=rev
Log:
HDFS-877. Client-driven block verification not functioning. Contributed by Todd Lipcon.
Added:
hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java
Modified:
hadoop/hdfs/trunk/CHANGES.txt
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java
Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=902024&r1=902023&r2=902024&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Fri Jan 22 07:53:27 2010
@@ -91,6 +91,9 @@
HDFS-885. Datanode toString() NPEs on null dnRegistration. (stevel)
+ HDFS-877. Client-driven block verification not functioning. (Todd
+ Lipcon via hairong)
+
Release 0.21.0 - Unreleased
INCOMPATIBLE CHANGES
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java?rev=902024&r1=902023&r2=902024&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java Fri Jan 22 07:53:27 2010
@@ -1322,23 +1322,37 @@
*/
public static class BlockReader extends FSInputChecker {
- private Socket dnSock; //for now just sending checksumOk.
+ Socket dnSock; //for now just sending checksumOk.
private DataInputStream in;
private DataChecksum checksum;
+
+ /** offset in block of the last chunk received */
private long lastChunkOffset = -1;
private long lastChunkLen = -1;
private long lastSeqNo = -1;
+ /** offset in block where reader wants to actually read */
private long startOffset;
- private long firstChunkOffset;
+
+ /** offset in block of of first chunk - may be less than startOffset
+ if startOffset is not chunk-aligned */
+ private final long firstChunkOffset;
+
private int bytesPerChecksum;
private int checksumSize;
+
+ /**
+ * The total number of bytes we need to transfer from the DN.
+ * This is the amount that the user has requested plus some padding
+ * at the beginning so that the read can begin on a chunk boundary.
+ */
+ private final long bytesNeededToFinish;
+
private boolean gotEOS = false;
byte[] skipBuf = null;
ByteBuffer checksumBytes = null;
int dataLeft = 0;
- boolean isLastPacket = false;
/* FSInputChecker interface */
@@ -1353,6 +1367,11 @@
public synchronized int read(byte[] buf, int off, int len)
throws IOException {
+ // This has to be set here, *before* the skip, since we can
+ // hit EOS during the skip, in the case that our entire read
+ // is smaller than the checksum chunk.
+ boolean eosBefore = gotEOS;
+
//for the first read, skip the extra bytes at the front.
if (lastChunkLen < 0 && startOffset > firstChunkOffset && len > 0) {
// Skip these bytes. But don't call this.skip()!
@@ -1366,7 +1385,6 @@
}
}
- boolean eosBefore = gotEOS;
int nRead = super.read(buf, off, len);
// if gotEOS was set in the previous read and checksum is enabled :
@@ -1444,13 +1462,8 @@
int len, byte[] checksumBuf)
throws IOException {
// Read one chunk.
-
if ( gotEOS ) {
- if ( startOffset < 0 ) {
- //This is mainly for debugging. can be removed.
- throw new IOException( "BlockRead: already got EOS or an error" );
- }
- startOffset = -1;
+ // Already hit EOF
return -1;
}
@@ -1460,6 +1473,10 @@
chunkOffset += lastChunkLen;
}
+ // pos is relative to the start of the first chunk of the read.
+ // chunkOffset is relative to the start of the block.
+ // This makes sure that the read passed from FSInputChecker is the
+ // for the same chunk we expect to be reading from the DN.
if ( (pos + firstChunkOffset) != chunkOffset ) {
throw new IOException("Mismatch in pos : " + pos + " + " +
firstChunkOffset + " != " + chunkOffset);
@@ -1494,7 +1511,6 @@
}
lastSeqNo = seqno;
- isLastPacket = lastPacketInBlock;
dataLeft = dataLen;
adjustChecksumBytes(dataLen);
if (dataLen > 0) {
@@ -1542,16 +1558,41 @@
IOUtils.readFully(in, buf, offset, bytesToRead);
checksumBytes.get(checksumBuf, 0, checksumSize * checksumsToRead);
}
-
+
dataLeft -= bytesToRead;
assert dataLeft >= 0;
lastChunkOffset = chunkOffset;
lastChunkLen = bytesToRead;
-
- if ((dataLeft == 0 && isLastPacket) || bytesToRead == 0) {
+
+ // If there's no data left in the current packet after satisfying
+ // this read, and we have satisfied the client read, we expect
+ // an empty packet header from the DN to signify this.
+ // Note that pos + bytesToRead may in fact be greater since the
+ // DN finishes off the entire last chunk.
+ if (dataLeft == 0 &&
+ pos + bytesToRead >= bytesNeededToFinish) {
+
+ // Read header
+ int packetLen = in.readInt();
+ long offsetInBlock = in.readLong();
+ long seqno = in.readLong();
+ boolean lastPacketInBlock = in.readBoolean();
+ int dataLen = in.readInt();
+
+ if (!lastPacketInBlock ||
+ dataLen != 0) {
+ throw new IOException("Expected empty end-of-read packet! Header: " +
+ "(packetLen : " + packetLen +
+ ", offsetInBlock : " + offsetInBlock +
+ ", seqno : " + seqno +
+ ", lastInBlock : " + lastPacketInBlock +
+ ", dataLen : " + dataLen);
+ }
+
gotEOS = true;
}
+
if ( bytesToRead == 0 ) {
return -1;
}
@@ -1561,7 +1602,8 @@
private BlockReader( String file, long blockId, DataInputStream in,
DataChecksum checksum, boolean verifyChecksum,
- long startOffset, long firstChunkOffset,
+ long startOffset, long firstChunkOffset,
+ long bytesToRead,
Socket dnSock ) {
super(new Path("/blk_" + blockId + ":of:" + file)/*too non path-like?*/,
1, verifyChecksum,
@@ -1574,6 +1616,12 @@
this.checksum = checksum;
this.startOffset = Math.max( startOffset, 0 );
+ // The total number of bytes that we need to transfer from the DN is
+ // the amount that the user wants (bytesToRead), plus the padding at
+ // the beginning in order to chunk-align. Note that the DN may elect
+ // to send more than this amount if the read ends mid-chunk.
+ this.bytesNeededToFinish = bytesToRead + (startOffset - firstChunkOffset);
+
this.firstChunkOffset = firstChunkOffset;
lastChunkOffset = firstChunkOffset;
lastChunkLen = -1;
@@ -1650,7 +1698,8 @@
}
return new BlockReader( file, blockId, in, checksum, verifyChecksum,
- startOffset, firstChunkOffset, sock );
+ startOffset, firstChunkOffset, len,
+ sock );
}
@Override
@@ -1671,7 +1720,7 @@
* errors, we send OP_STATUS_CHECKSUM_OK to datanode to inform that
* checksum was verified and there was no error.
*/
- private void checksumOk(Socket sock) {
+ void checksumOk(Socket sock) {
try {
OutputStream out = NetUtils.getOutputStream(sock, HdfsConstants.WRITE_TIMEOUT);
CHECKSUM_OK.writeOutputStream(out);
Added: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java?rev=902024&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java (added)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java Fri Jan 22 07:53:27 2010
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import java.net.Socket;
+import java.net.InetSocketAddress;
+import java.io.DataOutputStream;
+import java.util.Random;
+import java.util.List;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSClient.BlockReader;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.net.NetUtils;
+
+import org.junit.Test;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.never;
+;
+import static org.junit.Assert.*;
+
+public class TestClientBlockVerification {
+ static MiniDFSCluster cluster = null;
+ static Configuration conf = null;
+ static FileSystem fs = null;
+ static final Path TEST_FILE = new Path("/test.file");
+ static final int FILE_SIZE_K = 256;
+ static LocatedBlock testBlock = null;
+
+ @BeforeClass
+ public static void setupCluster() throws Exception {
+ conf = new HdfsConfiguration();
+ int numDataNodes = 1;
+ conf.setInt("dfs.replication", numDataNodes);
+ cluster = new MiniDFSCluster(conf, numDataNodes, true, null);
+ cluster.waitActive();
+ fs = cluster.getFileSystem();
+
+ // Write a file with 256K of data
+ DataOutputStream os = fs.create(TEST_FILE);
+ byte data[] = new byte[1024];
+ new Random().nextBytes(data);
+ for (int i = 0; i < FILE_SIZE_K; i++) {
+ os.write(data);
+ }
+ os.close();
+
+ // Locate the block we just wrote
+ DFSClient dfsclient = new DFSClient(
+ new InetSocketAddress("localhost",
+ cluster.getNameNodePort()), conf);
+ List<LocatedBlock> locatedBlocks = dfsclient.getNamenode().getBlockLocations(
+ TEST_FILE.toString(), 0, FILE_SIZE_K * 1024).getLocatedBlocks();
+ testBlock = locatedBlocks.get(0); // first block
+ }
+
+ private BlockReader getBlockReader(
+ int offset, int lenToRead) throws IOException {
+ InetSocketAddress targetAddr = null;
+ Socket s = null;
+ BlockReader blockReader = null;
+ Block block = testBlock.getBlock();
+ DatanodeInfo[] nodes = testBlock.getLocations();
+ targetAddr = NetUtils.createSocketAddr(nodes[0].getName());
+ s = new Socket();
+ s.connect(targetAddr, HdfsConstants.READ_TIMEOUT);
+ s.setSoTimeout(HdfsConstants.READ_TIMEOUT);
+
+ return DFSClient.BlockReader.newBlockReader(
+ s, targetAddr.toString()+ ":" + block.getBlockId(), block.getBlockId(),
+ testBlock.getAccessToken(), block.getGenerationStamp(),
+ offset, lenToRead,
+ conf.getInt("io.file.buffer.size", 4096));
+ }
+
+ /**
+ * Verify that if we read an entire block, we send checksumOk
+ */
+ @Test
+ public void testBlockVerification() throws Exception {
+ BlockReader reader = spy(getBlockReader(0, FILE_SIZE_K * 1024));
+ slurpReader(reader, FILE_SIZE_K * 1024, true);
+ verify(reader).checksumOk(reader.dnSock);
+ reader.close();
+ }
+
+ /**
+ * Test that if we do an incomplete read, we don't call checksumOk
+ */
+ @Test
+ public void testIncompleteRead() throws Exception {
+ BlockReader reader = spy(getBlockReader(0, FILE_SIZE_K * 1024));
+ slurpReader(reader, FILE_SIZE_K / 2 * 1024, false);
+
+ // We asked the blockreader for the whole file, and only read
+ // half of it, so no checksumOk
+ verify(reader, never()).checksumOk(reader.dnSock);
+ reader.close();
+ }
+
+ /**
+ * Test that if we ask for a half block, and read it all, we *do*
+ * call checksumOk. The DN takes care of knowing whether it was
+ * the whole block or not.
+ */
+ @Test
+ public void testCompletePartialRead() throws Exception {
+ // Ask for half the file
+ BlockReader reader = spy(getBlockReader(0, FILE_SIZE_K * 1024 / 2));
+ // And read half the file
+ slurpReader(reader, FILE_SIZE_K * 1024 / 2, true);
+ verify(reader).checksumOk(reader.dnSock);
+ reader.close();
+ }
+
+ /**
+ * Test various unaligned reads to make sure that we properly
+ * account even when we don't start or end on a checksum boundary
+ */
+ @Test
+ public void testUnalignedReads() throws Exception {
+ int startOffsets[] = new int[] { 0, 3, 129 };
+ int lengths[] = new int[] { 30, 300, 512, 513, 1025 };
+ for (int startOffset : startOffsets) {
+ for (int length : lengths) {
+ DFSClient.LOG.info("Testing startOffset = " + startOffset + " and " +
+ " len=" + length);
+ BlockReader reader = spy(getBlockReader(startOffset, length));
+ slurpReader(reader, length, true);
+ verify(reader).checksumOk(reader.dnSock);
+ reader.close();
+ }
+ }
+ }
+
+
+ /**
+ * Read the given length from the given block reader.
+ *
+ * @param expectEOF if true, will expect an eof response when done
+ */
+ private void slurpReader(BlockReader reader, int length, boolean expectEof)
+ throws IOException {
+ byte buf[] = new byte[1024];
+ int nRead = 0;
+ while (nRead < length) {
+ DFSClient.LOG.info("So far read " + nRead + " - going to read more.");
+ int n = reader.read(buf, 0, buf.length);
+ assertTrue(n > 0);
+ nRead += n;
+ }
+ DFSClient.LOG.info("Done reading, expect EOF for next read.");
+ if (expectEof) {
+ assertEquals(-1, reader.read(buf, 0, buf.length));
+ }
+ }
+
+ @AfterClass
+ public static void teardownCluster() throws Exception {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+}
\ No newline at end of file