You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by ha...@apache.org on 2010/01/22 08:53:32 UTC

svn commit: r902024 - in /hadoop/hdfs/trunk: CHANGES.txt src/java/org/apache/hadoop/hdfs/DFSClient.java src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java

Author: hairong
Date: Fri Jan 22 07:53:27 2010
New Revision: 902024

URL: http://svn.apache.org/viewvc?rev=902024&view=rev
Log:
HDFS-877. Client-driven block verification not functioning. Contributed by Todd Lipcon.

Added:
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java
Modified:
    hadoop/hdfs/trunk/CHANGES.txt
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java

Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=902024&r1=902023&r2=902024&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Fri Jan 22 07:53:27 2010
@@ -91,6 +91,9 @@
     
     HDFS-885. Datanode toString() NPEs on null dnRegistration. (stevel)
 
+    HDFS-877. Client-driven block verification not functioning. (Todd
+    Lipcon via hairong)
+
 Release 0.21.0 - Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java?rev=902024&r1=902023&r2=902024&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java Fri Jan 22 07:53:27 2010
@@ -1322,23 +1322,37 @@
    */
   public static class BlockReader extends FSInputChecker {
 
-    private Socket dnSock; //for now just sending checksumOk.
+    Socket dnSock; //for now just sending checksumOk.
     private DataInputStream in;
     private DataChecksum checksum;
+
+    /** offset in block of the last chunk received */
     private long lastChunkOffset = -1;
     private long lastChunkLen = -1;
     private long lastSeqNo = -1;
 
+    /** offset in block where reader wants to actually read */
     private long startOffset;
-    private long firstChunkOffset;
+
+    /** offset in block of of first chunk - may be less than startOffset
+        if startOffset is not chunk-aligned */
+    private final long firstChunkOffset;
+
     private int bytesPerChecksum;
     private int checksumSize;
+
+    /**
+     * The total number of bytes we need to transfer from the DN.
+     * This is the amount that the user has requested plus some padding
+     * at the beginning so that the read can begin on a chunk boundary.
+     */
+    private final long bytesNeededToFinish;
+
     private boolean gotEOS = false;
     
     byte[] skipBuf = null;
     ByteBuffer checksumBytes = null;
     int dataLeft = 0;
-    boolean isLastPacket = false;
     
     /* FSInputChecker interface */
     
@@ -1353,6 +1367,11 @@
     public synchronized int read(byte[] buf, int off, int len) 
                                  throws IOException {
       
+      // This has to be set here, *before* the skip, since we can
+      // hit EOS during the skip, in the case that our entire read
+      // is smaller than the checksum chunk.
+      boolean eosBefore = gotEOS;
+
       //for the first read, skip the extra bytes at the front.
       if (lastChunkLen < 0 && startOffset > firstChunkOffset && len > 0) {
         // Skip these bytes. But don't call this.skip()!
@@ -1366,7 +1385,6 @@
         }
       }
       
-      boolean eosBefore = gotEOS;
       int nRead = super.read(buf, off, len);
       
       // if gotEOS was set in the previous read and checksum is enabled :
@@ -1444,13 +1462,8 @@
                                          int len, byte[] checksumBuf) 
                                          throws IOException {
       // Read one chunk.
-      
       if ( gotEOS ) {
-        if ( startOffset < 0 ) {
-          //This is mainly for debugging. can be removed.
-          throw new IOException( "BlockRead: already got EOS or an error" );
-        }
-        startOffset = -1;
+        // Already hit EOF
         return -1;
       }
       
@@ -1460,6 +1473,10 @@
         chunkOffset += lastChunkLen;
       }
       
+      // pos is relative to the start of the first chunk of the read.
+      // chunkOffset is relative to the start of the block.
+      // This makes sure that the read passed from FSInputChecker is the
+      // for the same chunk we expect to be reading from the DN.
       if ( (pos + firstChunkOffset) != chunkOffset ) {
         throw new IOException("Mismatch in pos : " + pos + " + " + 
                               firstChunkOffset + " != " + chunkOffset);
@@ -1494,7 +1511,6 @@
         }
         
         lastSeqNo = seqno;
-        isLastPacket = lastPacketInBlock;
         dataLeft = dataLen;
         adjustChecksumBytes(dataLen);
         if (dataLen > 0) {
@@ -1542,16 +1558,41 @@
         IOUtils.readFully(in, buf, offset, bytesToRead);
         checksumBytes.get(checksumBuf, 0, checksumSize * checksumsToRead);
       }
-      
+
       dataLeft -= bytesToRead;
       assert dataLeft >= 0;
 
       lastChunkOffset = chunkOffset;
       lastChunkLen = bytesToRead;
-      
-      if ((dataLeft == 0 && isLastPacket) || bytesToRead == 0) {
+
+      // If there's no data left in the current packet after satisfying
+      // this read, and we have satisfied the client read, we expect
+      // an empty packet header from the DN to signify this.
+      // Note that pos + bytesToRead may in fact be greater since the
+      // DN finishes off the entire last chunk.
+      if (dataLeft == 0 &&
+          pos + bytesToRead >= bytesNeededToFinish) {
+
+        // Read header
+        int packetLen = in.readInt();
+        long offsetInBlock = in.readLong();
+        long seqno = in.readLong();
+        boolean lastPacketInBlock = in.readBoolean();
+        int dataLen = in.readInt();
+
+        if (!lastPacketInBlock ||
+            dataLen != 0) {
+          throw new IOException("Expected empty end-of-read packet! Header: " +
+                                "(packetLen : " + packetLen + 
+                                ", offsetInBlock : " + offsetInBlock +
+                                ", seqno : " + seqno + 
+                                ", lastInBlock : " + lastPacketInBlock +
+                                ", dataLen : " + dataLen);
+        }
+
         gotEOS = true;
       }
+
       if ( bytesToRead == 0 ) {
         return -1;
       }
@@ -1561,7 +1602,8 @@
     
     private BlockReader( String file, long blockId, DataInputStream in, 
                          DataChecksum checksum, boolean verifyChecksum,
-                         long startOffset, long firstChunkOffset, 
+                         long startOffset, long firstChunkOffset,
+                         long bytesToRead,
                          Socket dnSock ) {
       super(new Path("/blk_" + blockId + ":of:" + file)/*too non path-like?*/,
             1, verifyChecksum,
@@ -1574,6 +1616,12 @@
       this.checksum = checksum;
       this.startOffset = Math.max( startOffset, 0 );
 
+      // The total number of bytes that we need to transfer from the DN is
+      // the amount that the user wants (bytesToRead), plus the padding at
+      // the beginning in order to chunk-align. Note that the DN may elect
+      // to send more than this amount if the read ends mid-chunk.
+      this.bytesNeededToFinish = bytesToRead + (startOffset - firstChunkOffset);
+
       this.firstChunkOffset = firstChunkOffset;
       lastChunkOffset = firstChunkOffset;
       lastChunkLen = -1;
@@ -1650,7 +1698,8 @@
       }
 
       return new BlockReader( file, blockId, in, checksum, verifyChecksum,
-                              startOffset, firstChunkOffset, sock );
+                              startOffset, firstChunkOffset, len,
+                              sock );
     }
 
     @Override
@@ -1671,7 +1720,7 @@
      * errors, we send OP_STATUS_CHECKSUM_OK to datanode to inform that 
      * checksum was verified and there was no error.
      */ 
-    private void checksumOk(Socket sock) {
+    void checksumOk(Socket sock) {
       try {
         OutputStream out = NetUtils.getOutputStream(sock, HdfsConstants.WRITE_TIMEOUT);
         CHECKSUM_OK.writeOutputStream(out);

Added: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java?rev=902024&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java (added)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java Fri Jan 22 07:53:27 2010
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import java.net.Socket;
+import java.net.InetSocketAddress;
+import java.io.DataOutputStream;
+import java.util.Random;
+import java.util.List;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSClient.BlockReader;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.net.NetUtils;
+
+import org.junit.Test;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.never;
+;
+import static org.junit.Assert.*;
+
+public class TestClientBlockVerification {
+  static MiniDFSCluster cluster = null;
+  static Configuration conf = null;
+  static FileSystem fs = null;
+  static final Path TEST_FILE = new Path("/test.file");
+  static final int FILE_SIZE_K = 256;
+  static LocatedBlock testBlock = null;
+
+  @BeforeClass
+  public static void setupCluster() throws Exception {
+    conf = new HdfsConfiguration();
+    int numDataNodes = 1;
+    conf.setInt("dfs.replication", numDataNodes);
+    cluster = new MiniDFSCluster(conf, numDataNodes, true, null);
+    cluster.waitActive();
+    fs = cluster.getFileSystem();
+
+    // Write a file with 256K of data
+    DataOutputStream os = fs.create(TEST_FILE);
+    byte data[] = new byte[1024];
+    new Random().nextBytes(data);
+    for (int i = 0; i < FILE_SIZE_K; i++) {
+      os.write(data);
+    }
+    os.close();
+
+    // Locate the block we just wrote
+    DFSClient dfsclient = new DFSClient(
+      new InetSocketAddress("localhost",
+                            cluster.getNameNodePort()), conf);
+    List<LocatedBlock> locatedBlocks = dfsclient.getNamenode().getBlockLocations(
+      TEST_FILE.toString(), 0, FILE_SIZE_K * 1024).getLocatedBlocks();
+    testBlock = locatedBlocks.get(0); // first block
+  }
+
+  private BlockReader getBlockReader(
+    int offset, int lenToRead) throws IOException {
+    InetSocketAddress targetAddr = null;
+    Socket s = null;
+    BlockReader blockReader = null;
+    Block block = testBlock.getBlock();
+    DatanodeInfo[] nodes = testBlock.getLocations();
+    targetAddr = NetUtils.createSocketAddr(nodes[0].getName());
+    s = new Socket();
+    s.connect(targetAddr, HdfsConstants.READ_TIMEOUT);
+    s.setSoTimeout(HdfsConstants.READ_TIMEOUT);
+
+    return DFSClient.BlockReader.newBlockReader(
+      s, targetAddr.toString()+ ":" + block.getBlockId(), block.getBlockId(),
+      testBlock.getAccessToken(), block.getGenerationStamp(),
+      offset, lenToRead,
+      conf.getInt("io.file.buffer.size", 4096));
+  }
+
+  /**
+   * Verify that if we read an entire block, we send checksumOk
+   */
+  @Test
+  public void testBlockVerification() throws Exception {
+    BlockReader reader = spy(getBlockReader(0, FILE_SIZE_K * 1024));
+    slurpReader(reader, FILE_SIZE_K * 1024, true);
+    verify(reader).checksumOk(reader.dnSock);
+    reader.close();
+  }
+
+  /**
+   * Test that if we do an incomplete read, we don't call checksumOk
+   */
+  @Test
+  public void testIncompleteRead() throws Exception {
+    BlockReader reader = spy(getBlockReader(0, FILE_SIZE_K * 1024));
+    slurpReader(reader, FILE_SIZE_K / 2 * 1024, false);
+
+    // We asked the blockreader for the whole file, and only read
+    // half of it, so no checksumOk
+    verify(reader, never()).checksumOk(reader.dnSock);
+    reader.close();
+  }
+
+  /**
+   * Test that if we ask for a half block, and read it all, we *do*
+   * call checksumOk. The DN takes care of knowing whether it was
+   * the whole block or not.
+   */
+  @Test
+  public void testCompletePartialRead() throws Exception {
+    // Ask for half the file
+    BlockReader reader = spy(getBlockReader(0, FILE_SIZE_K * 1024 / 2));
+    // And read half the file
+    slurpReader(reader, FILE_SIZE_K * 1024 / 2, true);
+    verify(reader).checksumOk(reader.dnSock);
+    reader.close();
+  }
+
+  /**
+   * Test various unaligned reads to make sure that we properly
+   * account even when we don't start or end on a checksum boundary
+   */
+  @Test
+  public void testUnalignedReads() throws Exception {
+    int startOffsets[] = new int[] { 0, 3, 129 };
+    int lengths[] = new int[] { 30, 300, 512, 513, 1025 };
+    for (int startOffset : startOffsets) {
+      for (int length : lengths) {
+        DFSClient.LOG.info("Testing startOffset = " + startOffset + " and " +
+                           " len=" + length);
+        BlockReader reader = spy(getBlockReader(startOffset, length));
+        slurpReader(reader, length, true);
+        verify(reader).checksumOk(reader.dnSock);
+        reader.close();
+      }
+    }
+  }
+
+
+  /**
+   * Read the given length from the given block reader.
+   *
+   * @param expectEOF if true, will expect an eof response when done
+   */
+  private void slurpReader(BlockReader reader, int length, boolean expectEof)
+    throws IOException {
+    byte buf[] = new byte[1024];
+    int nRead = 0;
+    while (nRead < length) {
+      DFSClient.LOG.info("So far read " + nRead + " - going to read more.");
+      int n = reader.read(buf, 0, buf.length);
+      assertTrue(n > 0);
+      nRead += n;
+    }
+    DFSClient.LOG.info("Done reading, expect EOF for next read.");
+    if (expectEof) {
+      assertEquals(-1, reader.read(buf, 0, buf.length));
+    }
+  }
+
+  @AfterClass
+  public static void teardownCluster() throws Exception {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+}
\ No newline at end of file