You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by el...@apache.org on 2010/11/19 10:08:06 UTC

svn commit: r1036767 - in /hadoop/hdfs/trunk: ./ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/server/datanode/ src/test/hdfs/org/apache/hadoop/hdfs/ src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/

Author: eli
Date: Fri Nov 19 09:08:06 2010
New Revision: 1036767

URL: http://svn.apache.org/viewvc?rev=1036767&view=rev
Log:
HDFS-1001. DataXceiver and BlockReader disagree on when to send/recv CHECKSUM_OK. Contributed by bc Wong

Added:
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/BlockReaderTestUtil.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataXceiver.java
Modified:
    hadoop/hdfs/trunk/CHANGES.txt
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/BlockReader.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java

Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=1036767&r1=1036766&r2=1036767&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Fri Nov 19 09:08:06 2010
@@ -390,6 +390,9 @@ Release 0.22.0 - Unreleased
     HDFS-1487. FSDirectory.removeBlock() should update diskspace count 
     of the block owner node (Zhong Wang via eli).
 
+    HDFS-1001. DataXceiver and BlockReader disagree on when to send/recv
+    CHECKSUM_OK. (bc Wong via eli)
+
 Release 0.21.1 - Unreleased
 
   IMPROVEMENTS

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/BlockReader.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/BlockReader.java?rev=1036767&r1=1036766&r2=1036767&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/BlockReader.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/BlockReader.java Fri Nov 19 09:08:06 2010
@@ -435,7 +435,7 @@ public class BlockReader extends FSInput
     return readFully(this, buf, offset, len);
   }
   
-  /* When the reader reaches end of a block and there are no checksum
+  /* When the reader reaches end of the read and there are no checksum
    * errors, we send OP_STATUS_CHECKSUM_OK to datanode to inform that 
    * checksum was verified and there was no error.
    */ 

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1036767&r1=1036766&r2=1036767&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Fri Nov 19 09:08:06 2010
@@ -1284,10 +1284,12 @@ public class DataNode extends Configured
     "LastPacketInBlock" set to true or with a zero length. If there is 
     no checksum error, it replies to DataNode with OP_STATUS_CHECKSUM_OK:
     
-    Client optional response at the end of data transmission :
+    Client optional response at the end of data transmission of any length:
       +------------------------------+
       | 2 byte OP_STATUS_CHECKSUM_OK |
       +------------------------------+
+    The DataNode always checks OP_STATUS_CHECKSUM_OK. It will close the
+    client connection if it is absent.
     
     PACKET : Contains a packet header, checksum and data. Amount of data
     ======== carried is set by BUFFER_SIZE.

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1036767&r1=1036766&r2=1036767&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Fri Nov 19 09:08:06 2010
@@ -195,16 +195,17 @@ class DataXceiver extends DataTransferPr
       SUCCESS.write(out); // send op status
       long read = blockSender.sendBlock(out, baseStream, null); // send data
 
-      if (blockSender.isBlockReadFully()) {
-        // See if client verification succeeded. 
-        // This is an optional response from client.
-        try {
-          if (DataTransferProtocol.Status.read(in) == CHECKSUM_OK
-              && datanode.blockScanner != null) {
+      // If client verification succeeded, and if it's for the whole block,
+      // tell the DataBlockScanner that it's good. This is an optional response
+      // from the client. If absent, we close the connection (which is what we
+      // always do anyways).
+      try {
+        if (DataTransferProtocol.Status.read(in) == CHECKSUM_OK) {
+          if (blockSender.isBlockReadFully() && datanode.blockScanner != null) {
             datanode.blockScanner.verifiedByClient(block);
           }
-        } catch (IOException ignored) {}
-      }
+        }
+      } catch (IOException ignored) {}
       
       datanode.myMetrics.bytesRead.inc((int) read);
       datanode.myMetrics.blocksRead.inc();

Added: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/BlockReaderTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/BlockReaderTestUtil.java?rev=1036767&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/BlockReaderTestUtil.java (added)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/BlockReaderTestUtil.java Fri Nov 19 09:08:06 2010
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import java.net.Socket;
+import java.net.InetSocketAddress;
+import java.io.DataOutputStream;
+import java.util.Random;
+import java.util.List;
+import java.io.IOException;
+
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.net.NetUtils;
+
+import static org.junit.Assert.*;
+
+/**
+ * A helper class to setup the cluster, and get to BlockReader and DataNode for a block.
+ */
+public class BlockReaderTestUtil {
+
+  private HdfsConfiguration conf = null;
+  private MiniDFSCluster cluster = null;
+
+  /**
+   * Setup the cluster
+   */
+  public BlockReaderTestUtil(int replicationFactor) throws Exception {
+    conf = new HdfsConfiguration();
+    conf.setInt("dfs.replication", replicationFactor);
+    cluster = new MiniDFSCluster.Builder(conf).format(true).build();
+    cluster.waitActive();
+  }
+
+  /**
+   * Shutdown cluster
+   */
+  public void shutdown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  public MiniDFSCluster getCluster() {
+    return cluster;
+  }
+
+  public HdfsConfiguration getConf() {
+    return conf;
+  }
+
+  /**
+   * Create a file of the given size filled with random data.
+   * @return  List of Blocks of the new file.
+   */
+  public List<LocatedBlock> writeFile(Path filepath, int sizeKB)
+      throws IOException {
+    FileSystem fs = cluster.getFileSystem();
+
+    // Write a file with the specified amount of data
+    DataOutputStream os = fs.create(filepath);
+    byte data[] = new byte[1024];
+    new Random().nextBytes(data);
+    for (int i = 0; i < sizeKB; i++) {
+      os.write(data);
+    }
+    os.close();
+
+    // Return the blocks we just wrote
+    DFSClient dfsclient = new DFSClient(
+      new InetSocketAddress("localhost", cluster.getNameNodePort()), conf);
+    return dfsclient.getNamenode().getBlockLocations(
+      filepath.toString(), 0, sizeKB * 1024).getLocatedBlocks();
+  }
+
+
+  /**
+   * Exercise the BlockReader and read length bytes.
+   *
+   * It does not verify the bytes read.
+   */
+  public void readAndCheckEOS(BlockReader reader, int length, boolean expectEof)
+      throws IOException {
+    byte buf[] = new byte[1024];
+    int nRead = 0;
+    while (nRead < length) {
+      DFSClient.LOG.info("So far read " + nRead + " - going to read more.");
+      int n = reader.read(buf, 0, buf.length);
+      assertTrue(n > 0);
+      nRead += n;
+    }
+
+    if (expectEof) {
+      DFSClient.LOG.info("Done reading, expect EOF for next read.");
+      assertEquals(-1, reader.read(buf, 0, buf.length));
+    }
+  }
+
+  /**
+   * Get a BlockReader for the given block.
+   */
+  public BlockReader getBlockReader(LocatedBlock testBlock, int offset, int lenToRead)
+      throws IOException {
+    InetSocketAddress targetAddr = null;
+    Socket sock = null;
+    Block block = testBlock.getBlock();
+    DatanodeInfo[] nodes = testBlock.getLocations();
+    targetAddr = NetUtils.createSocketAddr(nodes[0].getName());
+    sock = new Socket();
+    sock.connect(targetAddr, HdfsConstants.READ_TIMEOUT);
+    sock.setSoTimeout(HdfsConstants.READ_TIMEOUT);
+
+    return BlockReader.newBlockReader(
+      sock, targetAddr.toString()+ ":" + block.getBlockId(), block,
+      testBlock.getBlockToken(), 
+      offset, lenToRead,
+      conf.getInt("io.file.buffer.size", 4096));
+  }
+
+  /**
+   * Get a DataNode that serves our testBlock.
+   */
+  public DataNode getDataNode(LocatedBlock testBlock) {
+    DatanodeInfo[] nodes = testBlock.getLocations();
+    int ipcport = nodes[0].ipcPort;
+    return cluster.getDataNode(ipcport);
+  }
+
+}

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java?rev=1036767&r1=1036766&r2=1036767&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java Fri Nov 19 09:08:06 2010
@@ -18,21 +18,10 @@
 
 package org.apache.hadoop.hdfs;
 
-import java.net.Socket;
-import java.net.InetSocketAddress;
-import java.io.DataOutputStream;
-import java.util.Random;
 import java.util.List;
-import java.io.IOException;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.net.NetUtils;
 
 import org.junit.Test;
 import org.junit.AfterClass;
@@ -44,56 +33,18 @@ import static org.mockito.Mockito.never;
 import static org.junit.Assert.*;
 
 public class TestClientBlockVerification {
-  static MiniDFSCluster cluster = null;
-  static Configuration conf = null;
-  static FileSystem fs = null;
+
+  static BlockReaderTestUtil util = null;
   static final Path TEST_FILE = new Path("/test.file");
   static final int FILE_SIZE_K = 256;
   static LocatedBlock testBlock = null;
 
   @BeforeClass
   public static void setupCluster() throws Exception {
-    conf = new HdfsConfiguration();
-    int numDataNodes = 1;
-    conf.setInt("dfs.replication", numDataNodes);
-    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes).build();
-    cluster.waitActive();
-    fs = cluster.getFileSystem();
-
-    // Write a file with 256K of data
-    DataOutputStream os = fs.create(TEST_FILE);
-    byte data[] = new byte[1024];
-    new Random().nextBytes(data);
-    for (int i = 0; i < FILE_SIZE_K; i++) {
-      os.write(data);
-    }
-    os.close();
-
-    // Locate the block we just wrote
-    DFSClient dfsclient = new DFSClient(
-      new InetSocketAddress("localhost",
-                            cluster.getNameNodePort()), conf);
-    List<LocatedBlock> locatedBlocks = dfsclient.getNamenode().getBlockLocations(
-      TEST_FILE.toString(), 0, FILE_SIZE_K * 1024).getLocatedBlocks();
-    testBlock = locatedBlocks.get(0); // first block
-  }
-
-  private BlockReader getBlockReader(
-    int offset, int lenToRead) throws IOException {
-    InetSocketAddress targetAddr = null;
-    Socket s = null;
-    Block block = testBlock.getBlock();
-    DatanodeInfo[] nodes = testBlock.getLocations();
-    targetAddr = NetUtils.createSocketAddr(nodes[0].getName());
-    s = new Socket();
-    s.connect(targetAddr, HdfsConstants.READ_TIMEOUT);
-    s.setSoTimeout(HdfsConstants.READ_TIMEOUT);
-
-    String file = BlockReader.getFileName(targetAddr,
-        block.getBlockId());
-    return BlockReader.newBlockReader(s, file, block,
-        testBlock.getBlockToken(), offset, lenToRead, conf.getInt(
-            "io.file.buffer.size", 4096));
+    final int REPLICATION_FACTOR = 1;
+    util = new BlockReaderTestUtil(REPLICATION_FACTOR);
+    List<LocatedBlock> blkList = util.writeFile(TEST_FILE, FILE_SIZE_K);
+    testBlock = blkList.get(0);     // Use the first block to test
   }
 
   /**
@@ -101,8 +52,8 @@ public class TestClientBlockVerification
    */
   @Test
   public void testBlockVerification() throws Exception {
-    BlockReader reader = spy(getBlockReader(0, FILE_SIZE_K * 1024));
-    slurpReader(reader, FILE_SIZE_K * 1024, true);
+    BlockReader reader = spy(util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024));
+    util.readAndCheckEOS(reader, FILE_SIZE_K * 1024, true);
     verify(reader).checksumOk(reader.dnSock);
     reader.close();
   }
@@ -112,8 +63,8 @@ public class TestClientBlockVerification
    */
   @Test
   public void testIncompleteRead() throws Exception {
-    BlockReader reader = spy(getBlockReader(0, FILE_SIZE_K * 1024));
-    slurpReader(reader, FILE_SIZE_K / 2 * 1024, false);
+    BlockReader reader = spy(util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024));
+    util.readAndCheckEOS(reader, FILE_SIZE_K / 2 * 1024, false);
 
     // We asked the blockreader for the whole file, and only read
     // half of it, so no checksumOk
@@ -129,9 +80,9 @@ public class TestClientBlockVerification
   @Test
   public void testCompletePartialRead() throws Exception {
     // Ask for half the file
-    BlockReader reader = spy(getBlockReader(0, FILE_SIZE_K * 1024 / 2));
+    BlockReader reader = spy(util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024 / 2));
     // And read half the file
-    slurpReader(reader, FILE_SIZE_K * 1024 / 2, true);
+    util.readAndCheckEOS(reader, FILE_SIZE_K * 1024 / 2, true);
     verify(reader).checksumOk(reader.dnSock);
     reader.close();
   }
@@ -148,8 +99,8 @@ public class TestClientBlockVerification
       for (int length : lengths) {
         DFSClient.LOG.info("Testing startOffset = " + startOffset + " and " +
                            " len=" + length);
-        BlockReader reader = spy(getBlockReader(startOffset, length));
-        slurpReader(reader, length, true);
+        BlockReader reader = spy(util.getBlockReader(testBlock, startOffset, length));
+        util.readAndCheckEOS(reader, length, true);
         verify(reader).checksumOk(reader.dnSock);
         reader.close();
       }
@@ -157,32 +108,9 @@ public class TestClientBlockVerification
   }
 
 
-  /**
-   * Read the given length from the given block reader.
-   *
-   * @param expectEOF if true, will expect an eof response when done
-   */
-  private void slurpReader(BlockReader reader, int length, boolean expectEof)
-    throws IOException {
-    byte buf[] = new byte[1024];
-    int nRead = 0;
-    while (nRead < length) {
-      DFSClient.LOG.info("So far read " + nRead + " - going to read more.");
-      int n = reader.read(buf, 0, buf.length);
-      assertTrue(n > 0);
-      nRead += n;
-    }
-    DFSClient.LOG.info("Done reading, expect EOF for next read.");
-    if (expectEof) {
-      assertEquals(-1, reader.read(buf, 0, buf.length));
-    }
-  }
-
   @AfterClass
   public static void teardownCluster() throws Exception {
-    if (cluster != null) {
-      cluster.shutdown();
-    }
+    util.shutdown();
   }
 
-}
\ No newline at end of file
+}

Added: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataXceiver.java?rev=1036767&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataXceiver.java (added)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataXceiver.java Fri Nov 19 09:08:06 2010
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.util.List;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.BlockReader;
+import org.apache.hadoop.hdfs.BlockReaderTestUtil;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+
+import org.junit.Test;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.mockito.Mockito;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.never;
+
+public class TestDataXceiver {
+  static BlockReaderTestUtil util = null;
+  static final Path TEST_FILE = new Path("/test.file");
+  static final int FILE_SIZE_K = 256;
+  static LocatedBlock testBlock = null;
+
+  @BeforeClass
+  public static void setupCluster() throws Exception {
+    final int REPLICATION_FACTOR = 1;
+    util = new BlockReaderTestUtil(REPLICATION_FACTOR);
+    List<LocatedBlock> blkList = util.writeFile(TEST_FILE, FILE_SIZE_K);
+    testBlock = blkList.get(0);     // Use the first block to test
+  }
+
+  /**
+   * Test that we don't call verifiedByClient() when the client only
+   * reads a partial block.
+   */
+  @Test
+  public void testCompletePartialRead() throws Exception {
+    // Ask for half the file
+    BlockReader reader = util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024 / 2);
+    DataNode dn = util.getDataNode(testBlock);
+    DataBlockScanner scanner = spy(dn.blockScanner);
+    dn.blockScanner = scanner;
+
+    // And read half the file
+    util.readAndCheckEOS(reader, FILE_SIZE_K * 1024 / 2, true);
+    verify(scanner, never()).verifiedByClient(Mockito.isA(Block.class));
+    reader.close();
+  }
+
+  @AfterClass
+  public static void teardownCluster() throws Exception {
+    util.shutdown();
+  }
+}