You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by el...@apache.org on 2010/11/19 10:08:06 UTC
svn commit: r1036767 - in /hadoop/hdfs/trunk: ./
src/java/org/apache/hadoop/hdfs/
src/java/org/apache/hadoop/hdfs/server/datanode/
src/test/hdfs/org/apache/hadoop/hdfs/
src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/
Author: eli
Date: Fri Nov 19 09:08:06 2010
New Revision: 1036767
URL: http://svn.apache.org/viewvc?rev=1036767&view=rev
Log:
HDFS-1001. DataXceiver and BlockReader disagree on when to send/recv CHECKSUM_OK. Contributed by bc Wong
Added:
hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/BlockReaderTestUtil.java
hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataXceiver.java
Modified:
hadoop/hdfs/trunk/CHANGES.txt
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/BlockReader.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java
Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=1036767&r1=1036766&r2=1036767&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Fri Nov 19 09:08:06 2010
@@ -390,6 +390,9 @@ Release 0.22.0 - Unreleased
HDFS-1487. FSDirectory.removeBlock() should update diskspace count
of the block owner node (Zhong Wang via eli).
+ HDFS-1001. DataXceiver and BlockReader disagree on when to send/recv
+ CHECKSUM_OK. (bc Wong via eli)
+
Release 0.21.1 - Unreleased
IMPROVEMENTS
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/BlockReader.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/BlockReader.java?rev=1036767&r1=1036766&r2=1036767&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/BlockReader.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/BlockReader.java Fri Nov 19 09:08:06 2010
@@ -435,7 +435,7 @@ public class BlockReader extends FSInput
return readFully(this, buf, offset, len);
}
- /* When the reader reaches end of a block and there are no checksum
+ /* When the reader reaches end of the read and there are no checksum
* errors, we send OP_STATUS_CHECKSUM_OK to datanode to inform that
* checksum was verified and there was no error.
*/
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1036767&r1=1036766&r2=1036767&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Fri Nov 19 09:08:06 2010
@@ -1284,10 +1284,12 @@ public class DataNode extends Configured
"LastPacketInBlock" set to true or with a zero length. If there is
no checksum error, it replies to DataNode with OP_STATUS_CHECKSUM_OK:
- Client optional response at the end of data transmission :
+ Client optional response at the end of data transmission of any length:
+------------------------------+
| 2 byte OP_STATUS_CHECKSUM_OK |
+------------------------------+
+ The DataNode always checks OP_STATUS_CHECKSUM_OK. It will close the
+ client connection if it is absent.
PACKET : Contains a packet header, checksum and data. Amount of data
======== carried is set by BUFFER_SIZE.
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1036767&r1=1036766&r2=1036767&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Fri Nov 19 09:08:06 2010
@@ -195,16 +195,17 @@ class DataXceiver extends DataTransferPr
SUCCESS.write(out); // send op status
long read = blockSender.sendBlock(out, baseStream, null); // send data
- if (blockSender.isBlockReadFully()) {
- // See if client verification succeeded.
- // This is an optional response from client.
- try {
- if (DataTransferProtocol.Status.read(in) == CHECKSUM_OK
- && datanode.blockScanner != null) {
+ // If client verification succeeded, and if it's for the whole block,
+ // tell the DataBlockScanner that it's good. This is an optional response
+ // from the client. If absent, we close the connection (which is what we
+ // always do anyways).
+ try {
+ if (DataTransferProtocol.Status.read(in) == CHECKSUM_OK) {
+ if (blockSender.isBlockReadFully() && datanode.blockScanner != null) {
datanode.blockScanner.verifiedByClient(block);
}
- } catch (IOException ignored) {}
- }
+ }
+ } catch (IOException ignored) {}
datanode.myMetrics.bytesRead.inc((int) read);
datanode.myMetrics.blocksRead.inc();
Added: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/BlockReaderTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/BlockReaderTestUtil.java?rev=1036767&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/BlockReaderTestUtil.java (added)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/BlockReaderTestUtil.java Fri Nov 19 09:08:06 2010
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import java.net.Socket;
+import java.net.InetSocketAddress;
+import java.io.DataOutputStream;
+import java.util.Random;
+import java.util.List;
+import java.io.IOException;
+
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.net.NetUtils;
+
+import static org.junit.Assert.*;
+
+/**
+ * A helper class to setup the cluster, and get to BlockReader and DataNode for a block.
+ */
+public class BlockReaderTestUtil {
+
+ private HdfsConfiguration conf = null;
+ private MiniDFSCluster cluster = null;
+
+ /**
+ * Setup the cluster
+ */
+ public BlockReaderTestUtil(int replicationFactor) throws Exception {
+ conf = new HdfsConfiguration();
+ conf.setInt("dfs.replication", replicationFactor);
+ cluster = new MiniDFSCluster.Builder(conf).format(true).build();
+ cluster.waitActive();
+ }
+
+ /**
+ * Shutdown cluster
+ */
+ public void shutdown() {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ public MiniDFSCluster getCluster() {
+ return cluster;
+ }
+
+ public HdfsConfiguration getConf() {
+ return conf;
+ }
+
+ /**
+ * Create a file of the given size filled with random data.
+ * @return List of Blocks of the new file.
+ */
+ public List<LocatedBlock> writeFile(Path filepath, int sizeKB)
+ throws IOException {
+ FileSystem fs = cluster.getFileSystem();
+
+ // Write a file with the specified amount of data
+ DataOutputStream os = fs.create(filepath);
+ byte data[] = new byte[1024];
+ new Random().nextBytes(data);
+ for (int i = 0; i < sizeKB; i++) {
+ os.write(data);
+ }
+ os.close();
+
+ // Return the blocks we just wrote
+ DFSClient dfsclient = new DFSClient(
+ new InetSocketAddress("localhost", cluster.getNameNodePort()), conf);
+ return dfsclient.getNamenode().getBlockLocations(
+ filepath.toString(), 0, sizeKB * 1024).getLocatedBlocks();
+ }
+
+
+ /**
+ * Exercise the BlockReader and read length bytes.
+ *
+ * It does not verify the bytes read.
+ */
+ public void readAndCheckEOS(BlockReader reader, int length, boolean expectEof)
+ throws IOException {
+ byte buf[] = new byte[1024];
+ int nRead = 0;
+ while (nRead < length) {
+ DFSClient.LOG.info("So far read " + nRead + " - going to read more.");
+ int n = reader.read(buf, 0, buf.length);
+ assertTrue(n > 0);
+ nRead += n;
+ }
+
+ if (expectEof) {
+ DFSClient.LOG.info("Done reading, expect EOF for next read.");
+ assertEquals(-1, reader.read(buf, 0, buf.length));
+ }
+ }
+
+ /**
+ * Get a BlockReader for the given block.
+ */
+ public BlockReader getBlockReader(LocatedBlock testBlock, int offset, int lenToRead)
+ throws IOException {
+ InetSocketAddress targetAddr = null;
+ Socket sock = null;
+ Block block = testBlock.getBlock();
+ DatanodeInfo[] nodes = testBlock.getLocations();
+ targetAddr = NetUtils.createSocketAddr(nodes[0].getName());
+ sock = new Socket();
+ sock.connect(targetAddr, HdfsConstants.READ_TIMEOUT);
+ sock.setSoTimeout(HdfsConstants.READ_TIMEOUT);
+
+ return BlockReader.newBlockReader(
+ sock, targetAddr.toString()+ ":" + block.getBlockId(), block,
+ testBlock.getBlockToken(),
+ offset, lenToRead,
+ conf.getInt("io.file.buffer.size", 4096));
+ }
+
+ /**
+ * Get a DataNode that serves our testBlock.
+ */
+ public DataNode getDataNode(LocatedBlock testBlock) {
+ DatanodeInfo[] nodes = testBlock.getLocations();
+ int ipcport = nodes[0].ipcPort;
+ return cluster.getDataNode(ipcport);
+ }
+
+}
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java?rev=1036767&r1=1036766&r2=1036767&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java Fri Nov 19 09:08:06 2010
@@ -18,21 +18,10 @@
package org.apache.hadoop.hdfs;
-import java.net.Socket;
-import java.net.InetSocketAddress;
-import java.io.DataOutputStream;
-import java.util.Random;
import java.util.List;
-import java.io.IOException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.server.common.HdfsConstants;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.net.NetUtils;
import org.junit.Test;
import org.junit.AfterClass;
@@ -44,56 +33,18 @@ import static org.mockito.Mockito.never;
import static org.junit.Assert.*;
public class TestClientBlockVerification {
- static MiniDFSCluster cluster = null;
- static Configuration conf = null;
- static FileSystem fs = null;
+
+ static BlockReaderTestUtil util = null;
static final Path TEST_FILE = new Path("/test.file");
static final int FILE_SIZE_K = 256;
static LocatedBlock testBlock = null;
@BeforeClass
public static void setupCluster() throws Exception {
- conf = new HdfsConfiguration();
- int numDataNodes = 1;
- conf.setInt("dfs.replication", numDataNodes);
- cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes).build();
- cluster.waitActive();
- fs = cluster.getFileSystem();
-
- // Write a file with 256K of data
- DataOutputStream os = fs.create(TEST_FILE);
- byte data[] = new byte[1024];
- new Random().nextBytes(data);
- for (int i = 0; i < FILE_SIZE_K; i++) {
- os.write(data);
- }
- os.close();
-
- // Locate the block we just wrote
- DFSClient dfsclient = new DFSClient(
- new InetSocketAddress("localhost",
- cluster.getNameNodePort()), conf);
- List<LocatedBlock> locatedBlocks = dfsclient.getNamenode().getBlockLocations(
- TEST_FILE.toString(), 0, FILE_SIZE_K * 1024).getLocatedBlocks();
- testBlock = locatedBlocks.get(0); // first block
- }
-
- private BlockReader getBlockReader(
- int offset, int lenToRead) throws IOException {
- InetSocketAddress targetAddr = null;
- Socket s = null;
- Block block = testBlock.getBlock();
- DatanodeInfo[] nodes = testBlock.getLocations();
- targetAddr = NetUtils.createSocketAddr(nodes[0].getName());
- s = new Socket();
- s.connect(targetAddr, HdfsConstants.READ_TIMEOUT);
- s.setSoTimeout(HdfsConstants.READ_TIMEOUT);
-
- String file = BlockReader.getFileName(targetAddr,
- block.getBlockId());
- return BlockReader.newBlockReader(s, file, block,
- testBlock.getBlockToken(), offset, lenToRead, conf.getInt(
- "io.file.buffer.size", 4096));
+ final int REPLICATION_FACTOR = 1;
+ util = new BlockReaderTestUtil(REPLICATION_FACTOR);
+ List<LocatedBlock> blkList = util.writeFile(TEST_FILE, FILE_SIZE_K);
+ testBlock = blkList.get(0); // Use the first block to test
}
/**
@@ -101,8 +52,8 @@ public class TestClientBlockVerification
*/
@Test
public void testBlockVerification() throws Exception {
- BlockReader reader = spy(getBlockReader(0, FILE_SIZE_K * 1024));
- slurpReader(reader, FILE_SIZE_K * 1024, true);
+ BlockReader reader = spy(util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024));
+ util.readAndCheckEOS(reader, FILE_SIZE_K * 1024, true);
verify(reader).checksumOk(reader.dnSock);
reader.close();
}
@@ -112,8 +63,8 @@ public class TestClientBlockVerification
*/
@Test
public void testIncompleteRead() throws Exception {
- BlockReader reader = spy(getBlockReader(0, FILE_SIZE_K * 1024));
- slurpReader(reader, FILE_SIZE_K / 2 * 1024, false);
+ BlockReader reader = spy(util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024));
+ util.readAndCheckEOS(reader, FILE_SIZE_K / 2 * 1024, false);
// We asked the blockreader for the whole file, and only read
// half of it, so no checksumOk
@@ -129,9 +80,9 @@ public class TestClientBlockVerification
@Test
public void testCompletePartialRead() throws Exception {
// Ask for half the file
- BlockReader reader = spy(getBlockReader(0, FILE_SIZE_K * 1024 / 2));
+ BlockReader reader = spy(util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024 / 2));
// And read half the file
- slurpReader(reader, FILE_SIZE_K * 1024 / 2, true);
+ util.readAndCheckEOS(reader, FILE_SIZE_K * 1024 / 2, true);
verify(reader).checksumOk(reader.dnSock);
reader.close();
}
@@ -148,8 +99,8 @@ public class TestClientBlockVerification
for (int length : lengths) {
DFSClient.LOG.info("Testing startOffset = " + startOffset + " and " +
" len=" + length);
- BlockReader reader = spy(getBlockReader(startOffset, length));
- slurpReader(reader, length, true);
+ BlockReader reader = spy(util.getBlockReader(testBlock, startOffset, length));
+ util.readAndCheckEOS(reader, length, true);
verify(reader).checksumOk(reader.dnSock);
reader.close();
}
@@ -157,32 +108,9 @@ public class TestClientBlockVerification
}
- /**
- * Read the given length from the given block reader.
- *
- * @param expectEOF if true, will expect an eof response when done
- */
- private void slurpReader(BlockReader reader, int length, boolean expectEof)
- throws IOException {
- byte buf[] = new byte[1024];
- int nRead = 0;
- while (nRead < length) {
- DFSClient.LOG.info("So far read " + nRead + " - going to read more.");
- int n = reader.read(buf, 0, buf.length);
- assertTrue(n > 0);
- nRead += n;
- }
- DFSClient.LOG.info("Done reading, expect EOF for next read.");
- if (expectEof) {
- assertEquals(-1, reader.read(buf, 0, buf.length));
- }
- }
-
@AfterClass
public static void teardownCluster() throws Exception {
- if (cluster != null) {
- cluster.shutdown();
- }
+ util.shutdown();
}
-}
\ No newline at end of file
+}
Added: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataXceiver.java?rev=1036767&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataXceiver.java (added)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataXceiver.java Fri Nov 19 09:08:06 2010
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.util.List;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.BlockReader;
+import org.apache.hadoop.hdfs.BlockReaderTestUtil;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+
+import org.junit.Test;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.mockito.Mockito;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.never;
+
+public class TestDataXceiver {
+ static BlockReaderTestUtil util = null;
+ static final Path TEST_FILE = new Path("/test.file");
+ static final int FILE_SIZE_K = 256;
+ static LocatedBlock testBlock = null;
+
+ @BeforeClass
+ public static void setupCluster() throws Exception {
+ final int REPLICATION_FACTOR = 1;
+ util = new BlockReaderTestUtil(REPLICATION_FACTOR);
+ List<LocatedBlock> blkList = util.writeFile(TEST_FILE, FILE_SIZE_K);
+ testBlock = blkList.get(0); // Use the first block to test
+ }
+
+ /**
+ * Test that we don't call verifiedByClient() when the client only
+ * reads a partial block.
+ */
+ @Test
+ public void testCompletePartialRead() throws Exception {
+ // Ask for half the file
+ BlockReader reader = util.getBlockReader(testBlock, 0, FILE_SIZE_K * 1024 / 2);
+ DataNode dn = util.getDataNode(testBlock);
+ DataBlockScanner scanner = spy(dn.blockScanner);
+ dn.blockScanner = scanner;
+
+ // And read half the file
+ util.readAndCheckEOS(reader, FILE_SIZE_K * 1024 / 2, true);
+ verify(scanner, never()).verifiedByClient(Mockito.isA(Block.class));
+ reader.close();
+ }
+
+ @AfterClass
+ public static void teardownCluster() throws Exception {
+ util.shutdown();
+ }
+}