You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by ha...@apache.org on 2009/10/01 01:39:33 UTC
svn commit: r820497 [6/7] - in /hadoop/hdfs/trunk: ./
.eclipse.templates/.launches/ src/contrib/hdfsproxy/
src/docs/src/documentation/content/xdocs/ src/java/
src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/
src/java/org/apach...
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java?rev=820497&r1=820496&r2=820497&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java Wed Sep 30 23:39:30 2009
@@ -33,6 +33,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
@@ -78,6 +79,8 @@
private File base_dir;
private File data_dir;
+ public final static String FINALIZED_DIR_NAME = "/current/finalized/";
+
/**
* This null constructor is used only when wishing to start a data node cluster
@@ -635,7 +638,7 @@
if (i < 0 || i >= dataNodes.size())
return false;
for (int dn = i*2; dn < i*2+2; dn++) {
- File blockFile = new File(dataDir, "data" + (dn+1) + "/current/" +
+ File blockFile = new File(dataDir, "data" + (dn+1) + FINALIZED_DIR_NAME +
blockName);
System.out.println("Corrupting for: " + blockFile);
if (blockFile.exists()) {
@@ -853,7 +856,7 @@
* @param dataNodeIndex - data node whose block report is desired - the index is same as for getDataNodes()
* @return the block report for the specified data node
*/
- public Block[] getBlockReport(int dataNodeIndex) {
+ public Iterable<Block> getBlockReport(int dataNodeIndex) {
if (dataNodeIndex < 0 || dataNodeIndex > dataNodes.size()) {
throw new IndexOutOfBoundsException();
}
@@ -864,11 +867,11 @@
/**
*
* @return block reports from all data nodes
- * Block[] is indexed in the same order as the list of datanodes returned by getDataNodes()
+ * BlockListAsLongs is indexed in the same order as the list of datanodes returned by getDataNodes()
*/
- public Block[][] getAllBlockReports() {
+ public Iterable<Block>[] getAllBlockReports() {
int numDataNodes = dataNodes.size();
- Block[][] result = new Block[numDataNodes][];
+ Iterable<Block>[] result = new BlockListAsLongs[numDataNodes];
for (int i = 0; i < numDataNodes; ++i) {
result[i] = getBlockReport(i);
}
@@ -885,7 +888,7 @@
* if any of blocks already exist in the data node
*
*/
- public void injectBlocks(int dataNodeIndex, Block[] blocksToInject) throws IOException {
+ public void injectBlocks(int dataNodeIndex, Iterable<Block> blocksToInject) throws IOException {
if (dataNodeIndex < 0 || dataNodeIndex > dataNodes.size()) {
throw new IndexOutOfBoundsException();
}
@@ -907,7 +910,7 @@
* if any of blocks already exist in the data nodes
* Note the rest of the blocks are not injected.
*/
- public void injectBlocks(Block[][] blocksToInject) throws IOException {
+ public void injectBlocks(Iterable<Block>[] blocksToInject) throws IOException {
if (blocksToInject.length > dataNodes.size()) {
throw new IndexOutOfBoundsException();
}
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestBlockMissingException.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestBlockMissingException.java?rev=820497&r1=820496&r2=820497&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestBlockMissingException.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestBlockMissingException.java Wed Sep 30 23:39:30 2009
@@ -123,14 +123,14 @@
* The Data directories for a datanode
*/
private File[] getDataNodeDirs(int i) throws IOException {
- File base_dir = new File(System.getProperty("test.build.data"), "dfs/");
+ String base_dir = MiniDFSCluster.getBaseDirectory();
File data_dir = new File(base_dir, "data");
File dir1 = new File(data_dir, "data"+(2*i+1));
File dir2 = new File(data_dir, "data"+(2*i+2));
if (dir1.isDirectory() && dir2.isDirectory()) {
File[] dir = new File[2];
- dir[0] = new File(dir1, "current");
- dir[1] = new File(dir2, "current");
+ dir[0] = new File(dir1, MiniDFSCluster.FINALIZED_DIR_NAME);
+ dir[1] = new File(dir2, MiniDFSCluster.FINALIZED_DIR_NAME);
return dir;
}
return new File[0];
Propchange: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestBlockMissingException.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestBlockReport.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestBlockReport.java?rev=820497&r1=820496&r2=820497&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestBlockReport.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestBlockReport.java Wed Sep 30 23:39:30 2009
@@ -103,15 +103,16 @@
// mock with newly created blocks
// I can't use DFSTestUtil.getAllBlocks(fs.open(filePath)) because it
// will keep the file open which will prevent the effect of the test
- Block[] blocks = locatedToBlocks(cluster.getNameNode().getBlockLocations(
+ ArrayList<Block> blocks =
+ locatedToBlocks(cluster.getNameNode().getBlockLocations(
filePath.toString(), FILE_START,
AppendTestUtil.FILE_SIZE).getLocatedBlocks(), null);
- LOG.info("Number of blocks allocated " + blocks.length);
- int[] newLengths = new int[blocks.length];
+ LOG.info("Number of blocks allocated " + blocks.size());
+ int[] newLengths = new int[blocks.size()];
int tempLen;
- for (int i = 0; i < blocks.length; i++) {
- Block b = blocks[i];
+ for (int i = 0; i < blocks.size(); i++) {
+ Block b = blocks.get(i);
LOG.debug("Block " + b.getBlockName() + " before\t" + "Size " +
b.getNumBytes());
LOG.debug("Setting new length");
@@ -123,7 +124,7 @@
}
cluster.getNameNode().blockReport(
cluster.listDataNodes()[DN_N0].dnRegistration,
- BlockListAsLongs.convertToArrayLongs(blocks));
+ new BlockListAsLongs(blocks, null).getBlockListAsLongs());
List<LocatedBlock> blocksAfterReport =
DFSTestUtil.getAllBlocks(fs.open(filePath));
@@ -175,7 +176,7 @@
for (Integer aRemovedIndex : removedIndex) {
blocks2Remove.add(lBlocks.get(aRemovedIndex).getBlock());
}
- Block[] blocks = locatedToBlocks(lBlocks, removedIndex);
+ ArrayList<Block> blocks = locatedToBlocks(lBlocks, removedIndex);
LOG.debug("Number of blocks allocated " + lBlocks.size());
@@ -196,7 +197,7 @@
cluster.getNameNode().blockReport(
cluster.listDataNodes()[DN_N0].dnRegistration,
- BlockListAsLongs.convertToArrayLongs(blocks));
+ new BlockListAsLongs(blocks, null).getBlockListAsLongs());
cluster.getNamesystem().computeDatanodeWork();
@@ -218,8 +219,8 @@
blocks2Remove.size(), cluster.getNamesystem().getUnderReplicatedBlocks());
}
- private Block[] locatedToBlocks(final List<LocatedBlock> locatedBlks,
- List<Integer> positionsToRemove) {
+ private ArrayList<Block> locatedToBlocks(final List<LocatedBlock> locatedBlks,
+ List<Integer> positionsToRemove) {
int substructLen = 0;
if (positionsToRemove != null) { // Need to allocated smaller array
substructLen = positionsToRemove.size();
@@ -233,7 +234,7 @@
}
newList.add(locatedBlks.get(i).getBlock());
}
- return newList.toArray(ret);
+ return newList;
}
private List<File> findAllFiles(File top, FilenameFilter mask) {
Propchange: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestBlockReport.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestBlocksScheduledCounter.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestBlocksScheduledCounter.java?rev=820497&r1=820496&r2=820497&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestBlocksScheduledCounter.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestBlocksScheduledCounter.java Wed Sep 30 23:39:30 2009
@@ -49,7 +49,7 @@
out.write(i);
}
// flush to make sure a block is allocated.
- ((DFSOutputStream)(out.getWrappedStream())).sync();
+ ((DFSOutputStream)(out.getWrappedStream())).hflush();
ArrayList<DatanodeDescriptor> dnList = new ArrayList<DatanodeDescriptor>();
cluster.getNamesystem().DFSNodesStatus(dnList, dnList);
Added: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java?rev=820497&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java (added)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java Wed Sep 30 23:39:30 2009
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSClient.DFSOutputStream;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.io.IOUtils;
+
+import org.junit.Test;
+import org.junit.Assert;
+
+/**
+ * This tests pipeline recovery related client protocol works correct or not.
+ */
+public class TestClientProtocolForPipelineRecovery {
+
+ @Test public void testGetNewStamp() throws IOException {
+ int numDataNodes = 1;
+ Configuration conf = new Configuration();
+ conf.setBoolean("dfs.support.append", true);
+ MiniDFSCluster cluster = new MiniDFSCluster(conf, numDataNodes, true, null);
+ try {
+ cluster.waitActive();
+ FileSystem fileSys = cluster.getFileSystem();
+ NameNode namenode = cluster.getNameNode();
+
+ /* Test writing to finalized replicas */
+ Path file = new Path("dataprotocol.dat");
+ DFSTestUtil.createFile(fileSys, file, 1L, (short)numDataNodes, 0L);
+ // get the first blockid for the file
+ Block firstBlock = DFSTestUtil.getFirstBlock(fileSys, file);
+
+ // test getNewStampAndToken on a finalized block
+ try {
+ namenode.updateBlockForPipeline(firstBlock, "");
+ Assert.fail("Can not get a new GS from a finalized block");
+ } catch (IOException e) {
+ Assert.assertTrue(e.getMessage().contains("is not under Construction"));
+ }
+
+ // test getNewStampAndToken on a non-existent block
+ try {
+ long newBlockId = firstBlock.getBlockId() + 1;
+ Block newBlock = new Block(newBlockId, 0,
+ firstBlock.getGenerationStamp());
+ namenode.updateBlockForPipeline(newBlock, "");
+ Assert.fail("Cannot get a new GS from a non-existent block");
+ } catch (IOException e) {
+ Assert.assertTrue(e.getMessage().contains("does not exist"));
+ }
+
+
+ /* Test RBW replicas */
+ // change first block to a RBW
+ DFSOutputStream out = null;
+ try {
+ out = (DFSOutputStream)(fileSys.append(file).
+ getWrappedStream());
+ out.write(1);
+ out.hflush();
+ FSDataInputStream in = null;
+ try {
+ in = fileSys.open(file);
+ firstBlock = DFSTestUtil.getAllBlocks(in).get(0).getBlock();
+ } finally {
+ IOUtils.closeStream(in);
+ }
+
+ // test non-lease holder
+ DFSClient dfs = ((DistributedFileSystem)fileSys).dfs;
+ try {
+ namenode.updateBlockForPipeline(firstBlock, "test" + dfs.clientName);
+ Assert.fail("Cannot get a new GS for a non lease holder");
+ } catch (LeaseExpiredException e) {
+ Assert.assertTrue(e.getMessage().startsWith("Lease mismatch"));
+ }
+
+ // test null lease holder
+ try {
+ namenode.updateBlockForPipeline(firstBlock, null);
+ Assert.fail("Cannot get a new GS for a null lease holder");
+ } catch (LeaseExpiredException e) {
+ Assert.assertTrue(e.getMessage().startsWith("Lease mismatch"));
+ }
+
+ // test getNewStampAndToken on a rbw block
+ namenode.updateBlockForPipeline(firstBlock, dfs.clientName);
+ } finally {
+ IOUtils.closeStream(out);
+ }
+ } finally {
+ cluster.shutdown();
+ }
+ }
+}
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java?rev=820497&r1=820496&r2=820497&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java Wed Sep 30 23:39:30 2009
@@ -136,7 +136,7 @@
return versionID;
}
- public LocatedBlock addBlock(String src, String clientName)
+ public LocatedBlock addBlock(String src, String clientName, Block previous)
throws IOException
{
num_calls++;
@@ -169,7 +169,7 @@
public void abandonBlock(Block b, String src, String holder) throws IOException {}
- public boolean complete(String src, String clientName) throws IOException { return false; }
+ public boolean complete(String src, String clientName, Block last) throws IOException { return false; }
public void reportBadBlocks(LocatedBlock[] blocks) throws IOException {}
@@ -215,6 +215,12 @@
public void setTimes(String src, long mtime, long atime) throws IOException {}
+ @Override public LocatedBlock updateBlockForPipeline(Block block,
+ String clientName) throws IOException { return null; }
+
+ @Override public void updatePipeline(String clientName, Block oldblock,
+ Block newBlock, DatanodeID[] newNodes)
+ throws IOException {}
}
public void testNotYetReplicatedErrors() throws IOException
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSShell.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSShell.java?rev=820497&r1=820496&r2=820497&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSShell.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSShell.java Wed Sep 30 23:39:30 2009
@@ -1094,7 +1094,7 @@
static List<File> getBlockFiles(MiniDFSCluster cluster) throws IOException {
List<File> files = new ArrayList<File>();
List<DataNode> datanodes = cluster.getDataNodes();
- Block[][] blocks = cluster.getAllBlockReports();
+ Iterable<Block>[] blocks = cluster.getAllBlockReports();
for(int i = 0; i < blocks.length; i++) {
FSDataset ds = (FSDataset)datanodes.get(i).getFSDataset();
for(Block b : blocks[i]) {
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java?rev=820497&r1=820496&r2=820497&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java Wed Sep 30 23:39:30 2009
@@ -41,9 +41,12 @@
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSClient.DFSOutputStream;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.server.common.HdfsConstants;
import org.apache.hadoop.io.IOUtils;
@@ -51,6 +54,7 @@
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.AccessToken;
import org.apache.hadoop.util.DataChecksum;
+import org.junit.Test;
/**
* This tests data transfer protocol handling in the Datanode. It sends
@@ -94,6 +98,7 @@
DataInputStream in = new DataInputStream(sock.getInputStream());
out.write(sendBuf.toByteArray());
+ out.flush();
try {
in.readFully(retBuf);
} catch (EOFException eof) {
@@ -137,7 +142,175 @@
in.readFully(arr);
}
- public void testDataTransferProtocol() throws IOException {
+ private void writeZeroLengthPacket(Block block, String description)
+ throws IOException {
+ sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
+ sendOut.writeInt(512); // checksum size
+ sendOut.writeInt(8); // size of packet
+ sendOut.writeLong(block.getNumBytes()); // OffsetInBlock
+ sendOut.writeLong(100); // sequencenumber
+ sendOut.writeBoolean(true); // lastPacketInBlock
+
+ sendOut.writeInt(0); // chunk length
+ sendOut.writeInt(0); // zero checksum
+
+ //ok finally write a block with 0 len
+ SUCCESS.write(recvOut);
+ Text.writeString(recvOut, ""); // first bad node
+ recvOut.writeLong(100); // sequencenumber
+ SUCCESS.write(recvOut);
+ sendRecvData(description, false);
+ }
+
+ private void testWrite(Block block, BlockConstructionStage stage, long newGS,
+ String description, Boolean eofExcepted) throws IOException {
+ sendBuf.reset();
+ recvBuf.reset();
+ DataTransferProtocol.Sender.opWriteBlock(sendOut,
+ block.getBlockId(), block.getGenerationStamp(), 0,
+ stage, newGS, block.getNumBytes(), block.getNumBytes(), "cl", null,
+ new DatanodeInfo[1], AccessToken.DUMMY_TOKEN);
+ if (eofExcepted) {
+ ERROR.write(recvOut);
+ sendRecvData(description, true);
+ } else if (stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
+ //ok finally write a block with 0 len
+ SUCCESS.write(recvOut);
+ Text.writeString(recvOut, ""); // first bad node
+ sendRecvData(description, false);
+ } else {
+ writeZeroLengthPacket(block, description);
+ }
+ }
+
+ @Test public void testOpWrite() throws IOException {
+ int numDataNodes = 1;
+ Configuration conf = new Configuration();
+ conf.setBoolean("dfs.support.append", true);
+ MiniDFSCluster cluster = new MiniDFSCluster(conf, numDataNodes, true, null);
+ try {
+ cluster.waitActive();
+ datanode = cluster.getDataNodes().get(0).dnRegistration;
+ dnAddr = NetUtils.createSocketAddr(datanode.getName());
+ FileSystem fileSys = cluster.getFileSystem();
+
+ /* Test writing to finalized replicas */
+ Path file = new Path("dataprotocol.dat");
+ DFSTestUtil.createFile(fileSys, file, 1L, (short)numDataNodes, 0L);
+ // get the first blockid for the file
+ Block firstBlock = DFSTestUtil.getFirstBlock(fileSys, file);
+ // test PIPELINE_SETUP_CREATE on a finalized block
+ testWrite(firstBlock, BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L,
+ "Cannot create an existing block", true);
+ // test PIPELINE_DATA_STREAMING on a finalized block
+ testWrite(firstBlock, BlockConstructionStage.DATA_STREAMING, 0L,
+ "Unexpected stage", true);
+ // test PIPELINE_SETUP_STREAMING_RECOVERY on an existing block
+ long newGS = firstBlock.getGenerationStamp() + 1;
+ testWrite(firstBlock,
+ BlockConstructionStage.PIPELINE_SETUP_STREAMING_RECOVERY,
+ newGS, "Cannot recover data streaming to a finalized replica", true);
+ // test PIPELINE_SETUP_APPEND on an existing block
+ newGS = firstBlock.getGenerationStamp() + 1;
+ testWrite(firstBlock,
+ BlockConstructionStage.PIPELINE_SETUP_APPEND,
+ newGS, "Append to a finalized replica", false);
+ firstBlock.setGenerationStamp(newGS);
+ // test PIPELINE_SETUP_APPEND_RECOVERY on an existing block
+ file = new Path("dataprotocol1.dat");
+ DFSTestUtil.createFile(fileSys, file, 1L, (short)numDataNodes, 0L);
+ firstBlock = DFSTestUtil.getFirstBlock(fileSys, file);
+ newGS = firstBlock.getGenerationStamp() + 1;
+ testWrite(firstBlock,
+ BlockConstructionStage.PIPELINE_SETUP_APPEND_RECOVERY, newGS,
+ "Recover appending to a finalized replica", false);
+ // test PIPELINE_CLOSE_RECOVERY on an existing block
+ file = new Path("dataprotocol2.dat");
+ DFSTestUtil.createFile(fileSys, file, 1L, (short)numDataNodes, 0L);
+ firstBlock = DFSTestUtil.getFirstBlock(fileSys, file);
+ newGS = firstBlock.getGenerationStamp() + 1;
+ testWrite(firstBlock,
+ BlockConstructionStage.PIPELINE_CLOSE_RECOVERY, newGS,
+ "Recover failed close to a finalized replica", false);
+ firstBlock.setGenerationStamp(newGS);
+
+ /* Test writing to a new block */
+ long newBlockId = firstBlock.getBlockId() + 1;
+ Block newBlock = new Block(newBlockId, 0,
+ firstBlock.getGenerationStamp());
+
+ // test PIPELINE_SETUP_CREATE on a new block
+ testWrite(newBlock, BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L,
+ "Create a new block", false);
+ // test PIPELINE_SETUP_STREAMING_RECOVERY on a new block
+ newGS = newBlock.getGenerationStamp() + 1;
+ newBlock.setBlockId(newBlock.getBlockId()+1);
+ testWrite(newBlock,
+ BlockConstructionStage.PIPELINE_SETUP_STREAMING_RECOVERY, newGS,
+ "Recover a new block", true);
+
+ // test PIPELINE_SETUP_APPEND on a new block
+ newGS = newBlock.getGenerationStamp() + 1;
+ testWrite(newBlock,
+ BlockConstructionStage.PIPELINE_SETUP_APPEND, newGS,
+ "Cannot append to a new block", true);
+
+ // test PIPELINE_SETUP_APPEND_RECOVERY on a new block
+ newBlock.setBlockId(newBlock.getBlockId()+1);
+ newGS = newBlock.getGenerationStamp() + 1;
+ testWrite(newBlock,
+ BlockConstructionStage.PIPELINE_SETUP_APPEND_RECOVERY, newGS,
+ "Cannot append to a new block", true);
+
+ /* Test writing to RBW replicas */
+ Path file1 = new Path("dataprotocol1.dat");
+ DFSTestUtil.createFile(fileSys, file1, 1L, (short)numDataNodes, 0L);
+ DFSOutputStream out = (DFSOutputStream)(fileSys.append(file1).
+ getWrappedStream());
+ out.write(1);
+ out.hflush();
+ FSDataInputStream in = fileSys.open(file1);
+ firstBlock = DFSTestUtil.getAllBlocks(in).get(0).getBlock();
+ firstBlock.setNumBytes(2L);
+
+ try {
+ // test PIPELINE_SETUP_CREATE on a RBW block
+ testWrite(firstBlock, BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L,
+ "Cannot create a RBW block", true);
+ // test PIPELINE_SETUP_APPEND on an existing block
+ newGS = newBlock.getGenerationStamp() + 1;
+ testWrite(firstBlock, BlockConstructionStage.PIPELINE_SETUP_APPEND,
+ newGS, "Cannot append to a RBW replica", true);
+ // test PIPELINE_SETUP_APPEND on an existing block
+ testWrite(firstBlock,
+ BlockConstructionStage.PIPELINE_SETUP_APPEND_RECOVERY,
+ newGS, "Recover append to a RBW replica", false);
+ firstBlock.setGenerationStamp(newGS);
+ // test PIPELINE_SETUP_STREAMING_RECOVERY on a RBW block
+ file = new Path("dataprotocol2.dat");
+ DFSTestUtil.createFile(fileSys, file, 1L, (short)numDataNodes, 0L);
+ out = (DFSOutputStream)(fileSys.append(file).
+ getWrappedStream());
+ out.write(1);
+ out.hflush();
+ in = fileSys.open(file);
+ firstBlock = DFSTestUtil.getAllBlocks(in).get(0).getBlock();
+ firstBlock.setNumBytes(2L);
+ newGS = firstBlock.getGenerationStamp() + 1;
+ testWrite(firstBlock,
+ BlockConstructionStage.PIPELINE_SETUP_STREAMING_RECOVERY,
+ newGS, "Recover a RBW replica", false);
+ } finally {
+ IOUtils.closeStream(in);
+ IOUtils.closeStream(out);
+ }
+
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
+@Test public void testDataTransferProtocol() throws IOException {
Random random = new Random();
int oneMil = 1024*1024;
Path file = new Path("dataprotocol.dat");
@@ -146,6 +319,7 @@
Configuration conf = new Configuration();
conf.setInt("dfs.replication", numDataNodes);
MiniDFSCluster cluster = new MiniDFSCluster(conf, numDataNodes, true, null);
+ try {
cluster.waitActive();
DFSClient dfsClient = new DFSClient(
new InetSocketAddress("localhost", cluster.getNameNodePort()),
@@ -178,16 +352,10 @@
/* Test OP_WRITE_BLOCK */
sendBuf.reset();
- sendOut.writeShort((short)DataTransferProtocol.DATA_TRANSFER_VERSION);
- WRITE_BLOCK.write(sendOut);
- sendOut.writeLong(newBlockId); // block id
- sendOut.writeLong(0); // generation stamp
- sendOut.writeInt(0); // targets in pipeline
- sendOut.writeBoolean(false); // recoveryFlag
- Text.writeString(sendOut, "cl");// clientID
- sendOut.writeBoolean(false); // no src node info
- sendOut.writeInt(0); // number of downstream targets
- AccessToken.DUMMY_TOKEN.write(sendOut);
+ DataTransferProtocol.Sender.opWriteBlock(sendOut,
+ newBlockId, 0L, 0,
+ BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L, 0L, 0L, "cl", null,
+ new DatanodeInfo[1], AccessToken.DUMMY_TOKEN);
sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
// bad bytes per checksum
@@ -198,32 +366,10 @@
sendBuf.reset();
recvBuf.reset();
- sendOut.writeShort((short)DataTransferProtocol.DATA_TRANSFER_VERSION);
- WRITE_BLOCK.write(sendOut);
- sendOut.writeLong(newBlockId);
- sendOut.writeLong(0); // generation stamp
- sendOut.writeInt(0); // targets in pipeline
- sendOut.writeBoolean(false); // recoveryFlag
- Text.writeString(sendOut, "cl");// clientID
- sendOut.writeBoolean(false); // no src node info
-
- // bad number of targets
- sendOut.writeInt(-1-random.nextInt(oneMil));
- ERROR.write(recvOut);
- sendRecvData("bad targets len while writing block " + newBlockId, true);
-
- sendBuf.reset();
- recvBuf.reset();
- sendOut.writeShort((short)DataTransferProtocol.DATA_TRANSFER_VERSION);
- WRITE_BLOCK.write(sendOut);
- sendOut.writeLong(++newBlockId);
- sendOut.writeLong(0); // generation stamp
- sendOut.writeInt(0); // targets in pipeline
- sendOut.writeBoolean(false); // recoveryFlag
- Text.writeString(sendOut, "cl");// clientID
- sendOut.writeBoolean(false); // no src node info
- sendOut.writeInt(0);
- AccessToken.DUMMY_TOKEN.write(sendOut);
+ DataTransferProtocol.Sender.opWriteBlock(sendOut,
+ ++newBlockId, 0L, 0,
+ BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L, 0L, 0L, "cl", null,
+ new DatanodeInfo[1], AccessToken.DUMMY_TOKEN);
sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
sendOut.writeInt(512);
sendOut.writeInt(4); // size of packet
@@ -243,16 +389,10 @@
// test for writing a valid zero size block
sendBuf.reset();
recvBuf.reset();
- sendOut.writeShort((short)DataTransferProtocol.DATA_TRANSFER_VERSION);
- WRITE_BLOCK.write(sendOut);
- sendOut.writeLong(++newBlockId);
- sendOut.writeLong(0); // generation stamp
- sendOut.writeInt(0); // targets in pipeline
- sendOut.writeBoolean(false); // recoveryFlag
- Text.writeString(sendOut, "cl");// clientID
- sendOut.writeBoolean(false); // no src node info
- sendOut.writeInt(0);
- AccessToken.DUMMY_TOKEN.write(sendOut);
+ DataTransferProtocol.Sender.opWriteBlock(sendOut,
+ ++newBlockId, 0L, 0,
+ BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L, 0L, 0L, "cl", null,
+ new DatanodeInfo[1], AccessToken.DUMMY_TOKEN);
sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
sendOut.writeInt(512); // checksum size
sendOut.writeInt(8); // size of packet
@@ -262,6 +402,7 @@
sendOut.writeInt(0); // chunk length
sendOut.writeInt(0); // zero checksum
+ sendOut.flush();
//ok finally write a block with 0 len
SUCCESS.write(recvOut);
Text.writeString(recvOut, ""); // first bad node
@@ -353,5 +494,8 @@
Text.writeString(sendOut, "cl");
AccessToken.DUMMY_TOKEN.write(sendOut);
readFile(fileSys, file, fileLen);
+ } finally {
+ cluster.shutdown();
+ }
}
}
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java?rev=820497&r1=820496&r2=820497&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java Wed Sep 30 23:39:30 2009
@@ -134,8 +134,8 @@
File baseDir = new File(MiniDFSCluster.getBaseDirectory(), "data");
boolean corrupted = false;
for (int i=replica*2; i<replica*2+2; i++) {
- File blockFile = new File(baseDir, "data" + (i+1)+ "/current/" +
- blockName);
+ File blockFile = new File(baseDir, "data" + (i+1) +
+ MiniDFSCluster.FINALIZED_DIR_NAME + blockName);
if (blockFile.exists()) {
// Corrupt replica by writing random bytes into replica
RandomAccessFile raFile = new RandomAccessFile(blockFile, "rw");
@@ -175,7 +175,7 @@
getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
blockCount = blocks.get(0).getLocations().length;
try {
- LOG.info("Looping until expected blockCount of 3 is received");
+ LOG.info("Looping until expected blockCount of 3 is received: " + blockCount);
Thread.sleep(1000);
} catch (InterruptedException ignore) {
}
@@ -194,7 +194,7 @@
getBlockLocations(file1.toString(), 0, Long.MAX_VALUE);
blockCount = blocks.get(0).getLocations().length;
try {
- LOG.info("Looping until expected blockCount of 2 is received");
+ LOG.info("Looping until expected blockCount of 2 is received: " + blockCount);
Thread.sleep(1000);
} catch (InterruptedException ignore) {
}
@@ -412,8 +412,8 @@
static boolean changeReplicaLength(String blockName, int dnIndex, int lenDelta) throws IOException {
File baseDir = new File(MiniDFSCluster.getBaseDirectory(), "data");
for (int i=dnIndex*2; i<dnIndex*2+2; i++) {
- File blockFile = new File(baseDir, "data" + (i+1)+ "/current/" +
- blockName);
+ File blockFile = new File(baseDir, "data" + (i+1) +
+ MiniDFSCluster.FINALIZED_DIR_NAME + blockName);
if (blockFile.exists()) {
RandomAccessFile raFile = new RandomAccessFile(blockFile, "rw");
raFile.setLength(raFile.length()+lenDelta);
@@ -427,10 +427,10 @@
private static void waitForBlockDeleted(String blockName, int dnIndex)
throws IOException, InterruptedException {
File baseDir = new File(MiniDFSCluster.getBaseDirectory(), "data");
- File blockFile1 = new File(baseDir, "data" + (2*dnIndex+1)+ "/current/" +
- blockName);
- File blockFile2 = new File(baseDir, "data" + (2*dnIndex+2)+ "/current/" +
- blockName);
+ File blockFile1 = new File(baseDir, "data" + (2*dnIndex+1) +
+ MiniDFSCluster.FINALIZED_DIR_NAME + blockName);
+ File blockFile2 = new File(baseDir, "data" + (2*dnIndex+2) +
+ MiniDFSCluster.FINALIZED_DIR_NAME + blockName);
while (blockFile1.exists() || blockFile2.exists()) {
Thread.sleep(100);
}
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend.java?rev=820497&r1=820496&r2=820497&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend.java Wed Sep 30 23:39:30 2009
@@ -148,7 +148,7 @@
Block b = blocks.get(i).getBlock();
System.out.println("testCopyOnWrite detaching block " + b);
assertTrue("Detaching block " + b + " should have returned true",
- dataset.detachBlock(b, 1));
+ dataset.unlinkBlock(b, 1));
}
// Since the blocks were already detached earlier, these calls should
@@ -158,7 +158,7 @@
Block b = blocks.get(i).getBlock();
System.out.println("testCopyOnWrite detaching block " + b);
assertTrue("Detaching block " + b + " should have returned false",
- !dataset.detachBlock(b, 1));
+ !dataset.unlinkBlock(b, 1));
}
} finally {
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend3.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend3.java?rev=820497&r1=820496&r2=820497&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend3.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend3.java Wed Sep 30 23:39:30 2009
@@ -33,7 +33,6 @@
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.FSDataset;
-import org.apache.hadoop.hdfs.server.protocol.BlockMetaDataInfo;
/** This class implements some of tests posted in HADOOP-2658. */
public class TestFileAppend3 extends junit.framework.TestCase {
@@ -220,6 +219,7 @@
FSDataOutputStream out = fs.append(p);
final int len2 = (int)BLOCK_SIZE/2;
AppendTestUtil.write(out, len1, len2);
+ out.sync();
//c. Rename file to file.new.
final Path pnew = new Path(p + ".new");
@@ -250,7 +250,7 @@
}
for(DatanodeInfo datanodeinfo : lb.getLocations()) {
final DataNode dn = cluster.getDataNode(datanodeinfo.getIpcPort());
- final BlockMetaDataInfo metainfo = dn.getBlockMetaDataInfo(blk);
+ final Block metainfo = dn.data.getStoredBlock(blk.getBlockId());
assertEquals(size, metainfo.getNumBytes());
}
}
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCorruption.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCorruption.java?rev=820497&r1=820496&r2=820497&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCorruption.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCorruption.java Wed Sep 30 23:39:30 2009
@@ -110,11 +110,11 @@
// get the block
File dataDir = new File(cluster.getDataDirectory(),
- "data1/current");
+ "data1" + MiniDFSCluster.FINALIZED_DIR_NAME);
Block blk = getBlock(dataDir);
if (blk == null) {
blk = getBlock(new File(cluster.getDataDirectory(),
- "dfs/data/data2/current"));
+ "dfs/data/data2" + MiniDFSCluster.FINALIZED_DIR_NAME));
}
assertFalse(blk==null);
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java?rev=820497&r1=820496&r2=820497&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java Wed Sep 30 23:39:30 2009
@@ -48,7 +48,6 @@
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.log4j.Level;
@@ -77,7 +76,7 @@
// the datanodes.
// creates a file but does not close it
- static FSDataOutputStream createFile(FileSystem fileSys, Path name, int repl)
+ public static FSDataOutputStream createFile(FileSystem fileSys, Path name, int repl)
throws IOException {
System.out.println("createFile: Created " + name + " with " + repl + " replica.");
FSDataOutputStream stm = fileSys.create(name, true,
@@ -96,7 +95,7 @@
//
// writes specified bytes to file.
//
- static void writeFile(FSDataOutputStream stm, int size) throws IOException {
+ public static void writeFile(FSDataOutputStream stm, int size) throws IOException {
byte[] buffer = AppendTestUtil.randomBytes(seed, size);
stm.write(buffer, 0, size);
}
@@ -447,9 +446,9 @@
System.out.println("testFileCreationError2: "
+ "The file has " + locations.locatedBlockCount() + " blocks.");
- // add another block to the file
+ // add one block to the file
LocatedBlock location = client.getNamenode().addBlock(file1.toString(),
- client.clientName);
+ client.clientName, null);
System.out.println("testFileCreationError2: "
+ "Added block " + location.getBlock());
Added: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestHFlush.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestHFlush.java?rev=820497&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestHFlush.java (added)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestHFlush.java Wed Sep 30 23:39:30 2009
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FSDataInputStream;
+import static org.junit.Assert.assertEquals;
+import org.junit.Test;
+
+import java.io.IOException;
+
+/** Class contains a set of tests to verify the correctness of
+ * newly introduced {@link DFSClient#hflush()} method */
+public class TestHFlush {
+ private final String fName = "hflushtest.dat";
+
+ /** The test uses {@link #doTheJob(Configuration, String, long, short)
+ * to write a file with a standard block size
+ */
+ @Test
+ public void hFlush_01() throws IOException {
+ doTheJob(new Configuration(), fName, AppendTestUtil.BLOCK_SIZE, (short)2);
+ }
+
+ /** The test uses {@link #doTheJob(Configuration, String, long, short)
+ * to write a file with a custom block size so the writes will be
+ * happening across block' boundaries
+ */
+ @Test
+ public void hFlush_02() throws IOException {
+ Configuration conf = new Configuration();
+ int customPerChecksumSize = 512;
+ int customBlockSize = customPerChecksumSize * 3;
+ // Modify defaul filesystem settings
+ conf.setInt("io.bytes.per.checksum", customPerChecksumSize);
+ conf.setLong("dfs.block.size", customBlockSize);
+
+ doTheJob(conf, fName, customBlockSize, (short)2);
+ }
+
+ /** The test uses {@link #doTheJob(Configuration, String, long, short)
+ * to write a file with a custom block size so the writes will be
+ * happening across block's and checksum' boundaries
+ */
+ @Test
+ public void hFlush_03() throws IOException {
+ Configuration conf = new Configuration();
+ int customPerChecksumSize = 400;
+ int customBlockSize = customPerChecksumSize * 3;
+ // Modify defaul filesystem settings
+ conf.setInt("io.bytes.per.checksum", customPerChecksumSize);
+ conf.setLong("dfs.block.size", customBlockSize);
+
+ doTheJob(conf, fName, customBlockSize, (short)2);
+ }
+
+ /**
+ The method starts new cluster with defined Configuration;
+ creates a file with specified block_size and writes 10 equal sections in it;
+ it also calls hflush() after each write and throws an IOException in case of
+ an error.
+ @param conf cluster configuration
+ @param fileName of the file to be created and processed as required
+ @param block_size value to be used for the file's creation
+ @param replicas is the number of replicas
+ @throws IOException in case of any errors
+ */
+ public static void doTheJob(Configuration conf, final String fileName,
+ long block_size, short replicas) throws IOException {
+ byte[] fileContent;
+ final int SECTIONS = 10;
+
+ fileContent = AppendTestUtil.initBuffer(AppendTestUtil.FILE_SIZE);
+ MiniDFSCluster cluster = new MiniDFSCluster(conf, replicas, true, null);
+ // Make sure we work with DFS in order to utilize all its functionality
+ DistributedFileSystem fileSystem =
+ (DistributedFileSystem)cluster.getFileSystem();
+
+ FSDataInputStream is;
+ try {
+ Path path = new Path(fileName);
+ FSDataOutputStream stm = fileSystem.create(path, false, 4096, replicas,
+ block_size);
+ System.out.println("Created file " + fileName);
+
+ int tenth = AppendTestUtil.FILE_SIZE/SECTIONS;
+ int rounding = AppendTestUtil.FILE_SIZE - tenth * SECTIONS;
+ for (int i=0; i<SECTIONS; i++) {
+ System.out.println("Writing " + (tenth * i) + " to " + (tenth * (i+1)) + " section to file " + fileName);
+ // write to the file
+ stm.write(fileContent, tenth * i, tenth);
+ // Wait while hflush() pushes all packets through built pipeline
+ ((DFSClient.DFSOutputStream)stm.getWrappedStream()).hflush();
+ byte [] toRead = new byte[tenth];
+ byte [] expected = new byte[tenth];
+ System.arraycopy(fileContent, tenth * i, expected, 0, tenth);
+ // Open the same file for read. Need to create new reader after every write operation(!)
+ is = fileSystem.open(path);
+ is.read(toRead, tenth * i, tenth);
+ is.close();
+ checkData(toRead, 0, expected, "Partial verification");
+ }
+ System.out.println("Writing " + (tenth * SECTIONS) + " to " + (tenth * SECTIONS + rounding) + " section to file " + fileName);
+ stm.write(fileContent, tenth * SECTIONS, rounding);
+ stm.close();
+
+ assertEquals("File size doesn't match ", AppendTestUtil.FILE_SIZE, fileSystem.getFileStatus(path).getLen());
+ AppendTestUtil.checkFullFile(fileSystem, path, fileContent.length, fileContent, "hflush()");
+
+ } catch (IOException ioe) {
+ ioe.printStackTrace();
+ throw ioe;
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ fileSystem.close();
+ cluster.shutdown();
+ }
+ }
+ static void checkData(final byte[] actual, int from,
+ final byte[] expected, String message) {
+ for (int idx = 0; idx < actual.length; idx++) {
+ assertEquals(message+" byte "+(from+idx)+" differs. expected "+
+ expected[from+idx]+" actual "+actual[idx],
+ expected[from+idx], actual[idx]);
+ actual[idx] = 0;
+ }
+ }
+}
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestInjectionForSimulatedStorage.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestInjectionForSimulatedStorage.java?rev=820497&r1=820496&r2=820497&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestInjectionForSimulatedStorage.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestInjectionForSimulatedStorage.java Wed Sep 30 23:39:30 2009
@@ -151,7 +151,7 @@
waitForBlockReplication(testFile, dfsClient.getNamenode(), numDataNodes, 20);
- Block[][] blocksList = cluster.getAllBlockReports();
+ Iterable<Block>[] blocksList = cluster.getAllBlockReports();
cluster.shutdown();
@@ -174,15 +174,14 @@
cluster.waitActive();
Set<Block> uniqueBlocks = new HashSet<Block>();
for (int i=0; i<blocksList.length; ++i) {
- for (int j=0; j < blocksList[i].length; ++j) {
- uniqueBlocks.add(blocksList[i][j]);
+ for (Block b : blocksList[i]) {
+ uniqueBlocks.add(new Block(b));
}
}
// Insert all the blocks in the first data node
LOG.info("Inserting " + uniqueBlocks.size() + " blocks");
- Block[] blocks = uniqueBlocks.toArray(new Block[uniqueBlocks.size()]);
- cluster.injectBlocks(0, blocks);
+ cluster.injectBlocks(0, uniqueBlocks);
dfsClient = new DFSClient(new InetSocketAddress("localhost",
cluster.getNameNodePort()),
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery.java?rev=820497&r1=820496&r2=820497&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery.java Wed Sep 30 23:39:30 2009
@@ -18,7 +18,6 @@
package org.apache.hadoop.hdfs;
import java.io.IOException;
-import java.util.Arrays;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -27,16 +26,15 @@
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.TestInterDatanodeProtocol;
-import org.apache.hadoop.hdfs.server.protocol.BlockMetaDataInfo;
-import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
public class TestLeaseRecovery extends junit.framework.TestCase {
static final int BLOCK_SIZE = 1024;
static final short REPLICATION_NUM = (short)3;
+ private static final long LEASE_PERIOD = 300L;
- static void checkMetaInfo(Block b, InterDatanodeProtocol idp
+ static void checkMetaInfo(Block b, DataNode dn
) throws IOException {
- TestInterDatanodeProtocol.checkMetaInfo(b, idp, null);
+ TestInterDatanodeProtocol.checkMetaInfo(b, dn);
}
static int min(Integer... x) {
@@ -49,6 +47,15 @@
return m;
}
+ void waitLeaseRecovery(MiniDFSCluster cluster) {
+ cluster.setLeasePeriod(LEASE_PERIOD, LEASE_PERIOD);
+ // wait for the lease to expire
+ try {
+ Thread.sleep(2 * 3000); // 2 heartbeat intervals
+ } catch (InterruptedException e) {
+ }
+ }
+
/**
* The following test first creates a file with a few blocks.
* It randomly truncates the replica of the last block stored in each datanode.
@@ -80,10 +87,8 @@
assertEquals(REPLICATION_NUM, datanodeinfos.length);
//connect to data nodes
- InterDatanodeProtocol[] idps = new InterDatanodeProtocol[REPLICATION_NUM];
DataNode[] datanodes = new DataNode[REPLICATION_NUM];
for(int i = 0; i < REPLICATION_NUM; i++) {
- idps[i] = DataNode.createInterDataNodeProtocolProxy(datanodeinfos[i], conf);
datanodes[i] = cluster.getDataNode(datanodeinfos[i].getIpcPort());
assertTrue(datanodes[i] != null);
}
@@ -92,44 +97,26 @@
Block lastblock = locatedblock.getBlock();
DataNode.LOG.info("newblocks=" + lastblock);
for(int i = 0; i < REPLICATION_NUM; i++) {
- checkMetaInfo(lastblock, idps[i]);
- }
-
- //setup random block sizes
- int lastblocksize = ORG_FILE_SIZE % BLOCK_SIZE;
- Integer[] newblocksizes = new Integer[REPLICATION_NUM];
- for(int i = 0; i < REPLICATION_NUM; i++) {
- newblocksizes[i] = AppendTestUtil.nextInt(lastblocksize);
+ checkMetaInfo(lastblock, datanodes[i]);
}
- DataNode.LOG.info("newblocksizes = " + Arrays.asList(newblocksizes));
- //update blocks with random block sizes
- Block[] newblocks = new Block[REPLICATION_NUM];
- for(int i = 0; i < REPLICATION_NUM; i++) {
- newblocks[i] = new Block(lastblock.getBlockId(), newblocksizes[i],
- lastblock.getGenerationStamp());
- idps[i].updateBlock(lastblock, newblocks[i], false);
- checkMetaInfo(newblocks[i], idps[i]);
- }
DataNode.LOG.info("dfs.dfs.clientName=" + dfs.dfs.clientName);
cluster.getNameNode().append(filestr, dfs.dfs.clientName);
- //block synchronization
- final int primarydatanodeindex = AppendTestUtil.nextInt(datanodes.length);
- DataNode.LOG.info("primarydatanodeindex =" + primarydatanodeindex);
- DataNode primary = datanodes[primarydatanodeindex];
- DataNode.LOG.info("primary.dnRegistration=" + primary.dnRegistration);
- primary.recoverBlocks(new Block[]{lastblock}, new DatanodeInfo[][]{datanodeinfos}).join();
-
- BlockMetaDataInfo[] updatedmetainfo = new BlockMetaDataInfo[REPLICATION_NUM];
- int minsize = min(newblocksizes);
- long currentGS = cluster.getNamesystem().getGenerationStamp();
- lastblock.setGenerationStamp(currentGS);
+ // expire lease to trigger block recovery.
+ waitLeaseRecovery(cluster);
+
+ Block[] updatedmetainfo = new Block[REPLICATION_NUM];
+ long oldSize = lastblock.getNumBytes();
+ lastblock = TestInterDatanodeProtocol.getLastLocatedBlock(
+ dfs.dfs.getNamenode(), filestr).getBlock();
+ long currentGS = lastblock.getGenerationStamp();
for(int i = 0; i < REPLICATION_NUM; i++) {
- updatedmetainfo[i] = idps[i].getBlockMetaDataInfo(lastblock);
+ updatedmetainfo[i] =
+ datanodes[i].data.getStoredBlock(lastblock.getBlockId());
assertEquals(lastblock.getBlockId(), updatedmetainfo[i].getBlockId());
- assertEquals(minsize, updatedmetainfo[i].getNumBytes());
+ assertEquals(oldSize, updatedmetainfo[i].getNumBytes());
assertEquals(currentGS, updatedmetainfo[i].getGenerationStamp());
}
}
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery2.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery2.java?rev=820497&r1=820496&r2=820497&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery2.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery2.java Wed Sep 30 23:39:30 2009
@@ -56,6 +56,7 @@
// conf.setInt("io.bytes.per.checksum", 16);
MiniDFSCluster cluster = null;
+ DistributedFileSystem dfs = null;
byte[] actual = new byte[FILE_SIZE];
try {
@@ -63,7 +64,7 @@
cluster.waitActive();
//create a file
- DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem();
+ dfs = (DistributedFileSystem)cluster.getFileSystem();
// create a random file name
String filestr = "/foo" + AppendTestUtil.nextInt();
System.out.println("filestr=" + filestr);
@@ -129,10 +130,9 @@
+ "Validating its contents now...");
// verify that file-size matches
+ long fileSize = dfs.getFileStatus(filepath).getLen();
assertTrue("File should be " + size + " bytes, but is actually " +
- " found to be " + dfs.getFileStatus(filepath).getLen() +
- " bytes",
- dfs.getFileStatus(filepath).getLen() == size);
+ " found to be " + fileSize + " bytes", fileSize == size);
// verify that there is enough data to read.
System.out.println("File size is good. Now validating sizes from datanodes...");
@@ -142,6 +142,7 @@
}
finally {
try {
+ if(dfs != null) dfs.close();
if (cluster != null) {cluster.shutdown();}
} catch (Exception e) {
// ignore
Added: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestReadWhileWriting.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestReadWhileWriting.java?rev=820497&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestReadWhileWriting.java (added)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestReadWhileWriting.java Wed Sep 30 23:39:30 2009
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.security.UnixUserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.log4j.Level;
+import org.junit.Assert;
+import org.junit.Test;
+
+/** Test reading from hdfs while a file is being written. */
+public class TestReadWhileWriting {
+ {
+ ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
+ ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
+ }
+
+ private static final String DIR = "/"
+ + TestReadWhileWriting.class.getSimpleName() + "/";
+ private static final int BLOCK_SIZE = 8192;
+
+ /** Test reading while writing. */
+ @Test
+ public void testReadWhileWriting() throws Exception {
+ final Configuration conf = new Configuration();
+ //enable append
+ conf.setBoolean("dfs.support.append", true);
+
+ // create cluster
+ final MiniDFSCluster cluster = new MiniDFSCluster(conf, 3, true, null);
+ try {
+ //change the lease soft limit to 1 second.
+ final long leaseSoftLimit = 1000;
+ cluster.setLeasePeriod(leaseSoftLimit, FSConstants.LEASE_HARDLIMIT_PERIOD);
+
+ //wait for the cluster
+ cluster.waitActive();
+ final FileSystem fs = cluster.getFileSystem();
+ final Path p = new Path(DIR, "file1");
+ final int half = BLOCK_SIZE/2;
+
+ //a. On Machine M1, Create file. Write half block of data.
+ // Invoke (DFSOutputStream).fsync() on the dfs file handle.
+ // Do not close file yet.
+ {
+ final FSDataOutputStream out = fs.create(p, true,
+ fs.getConf().getInt("io.file.buffer.size", 4096),
+ (short)3, BLOCK_SIZE);
+ write(out, 0, half);
+
+ //hflush
+ ((DFSClient.DFSOutputStream)out.getWrappedStream()).hflush();
+ }
+
+ //b. On another machine M2, open file and verify that the half-block
+ // of data can be read successfully.
+ checkFile(p, half, conf);
+
+ /* TODO: enable the following when append is done.
+ //c. On M1, append another half block of data. Close file on M1.
+ {
+ //sleep to make sure the lease is expired the soft limit.
+ Thread.sleep(2*leaseSoftLimit);
+
+ FSDataOutputStream out = fs.append(p);
+ write(out, 0, half);
+ out.close();
+ }
+
+ //d. On M2, open file and read 1 block of data from it. Close file.
+ checkFile(p, 2*half, conf);
+ */
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
+ static private int userCount = 0;
+ //check the file
+ static void checkFile(Path p, int expectedsize, Configuration conf
+ ) throws IOException {
+ //open the file with another user account
+ final Configuration conf2 = new Configuration(conf);
+ final String username = UserGroupInformation.getCurrentUGI().getUserName()
+ + "_" + ++userCount;
+ UnixUserGroupInformation.saveToConf(conf2,
+ UnixUserGroupInformation.UGI_PROPERTY_NAME,
+ new UnixUserGroupInformation(username, new String[]{"supergroup"}));
+ final FileSystem fs = FileSystem.get(conf2);
+ final InputStream in = fs.open(p);
+
+ //Is the data available?
+ Assert.assertTrue(available(in, expectedsize));
+
+ //Able to read?
+ for(int i = 0; i < expectedsize; i++) {
+ Assert.assertEquals((byte)i, (byte)in.read());
+ }
+
+ in.close();
+ }
+
+ /** Write something to a file */
+ private static void write(OutputStream out, int offset, int length
+ ) throws IOException {
+ final byte[] bytes = new byte[length];
+ for(int i = 0; i < length; i++) {
+ bytes[i] = (byte)(offset + i);
+ }
+ out.write(bytes);
+ }
+
+ /** Is the data available? */
+ private static boolean available(InputStream in, int expectedsize
+ ) throws IOException {
+ final int available = in.available();
+ System.out.println(" in.available()=" + available);
+ Assert.assertTrue(available >= 0);
+ Assert.assertTrue(available <= expectedsize);
+ return available == expectedsize;
+ }
+}
+
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestRenameWhileOpen.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestRenameWhileOpen.java?rev=820497&r1=820496&r2=820497&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestRenameWhileOpen.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestRenameWhileOpen.java Wed Sep 30 23:39:30 2009
@@ -36,6 +36,11 @@
((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
}
+ //TODO: un-comment checkFullFile once the lease recovery is done
+ private static void checkFullFile(FileSystem fs, Path p) throws IOException {
+ //TestFileCreation.checkFullFile(fs, p);
+ }
+
/**
* open /user/dir1/file1 /user/dir2/file2
* mkdir /user/dir3
@@ -114,7 +119,7 @@
assertTrue(!fs.exists(file1));
assertTrue(fs.exists(file2));
assertTrue(fs.exists(newfile));
- TestFileCreation.checkFullFile(fs, newfile);
+ checkFullFile(fs, newfile);
} finally {
fs.close();
cluster.shutdown();
@@ -186,7 +191,7 @@
assertTrue(!fs.exists(file1));
assertTrue(fs.exists(file2));
assertTrue(fs.exists(newfile));
- TestFileCreation.checkFullFile(fs, newfile);
+ checkFullFile(fs, newfile);
} finally {
fs.close();
cluster.shutdown();
@@ -250,7 +255,7 @@
Path newfile = new Path("/user/dir2", "file1");
assertTrue(!fs.exists(file1));
assertTrue(fs.exists(newfile));
- TestFileCreation.checkFullFile(fs, newfile);
+ checkFullFile(fs, newfile);
} finally {
fs.close();
cluster.shutdown();
@@ -312,7 +317,7 @@
Path newfile = new Path("/user", "dir2");
assertTrue(!fs.exists(file1));
assertTrue(fs.exists(newfile));
- TestFileCreation.checkFullFile(fs, newfile);
+ checkFullFile(fs, newfile);
} finally {
fs.close();
cluster.shutdown();
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestReplication.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestReplication.java?rev=820497&r1=820496&r2=820497&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestReplication.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestReplication.java Wed Sep 30 23:39:30 2009
@@ -291,7 +291,7 @@
* for under replicated blocks.
*
* It creates a file with one block and replication of 4. It corrupts
- * two of the blocks and removes one of the replicas. Expected behaviour is
+ * two of the blocks and removes one of the replicas. Expected behavior is
* that missing replica will be copied from one valid source.
*/
public void testPendingReplicationRetry() throws IOException {
@@ -341,7 +341,8 @@
int fileCount = 0;
for (int i=0; i<6; i++) {
- File blockFile = new File(baseDir, "data" + (i+1) + "/current/" + block);
+ File blockFile = new File(baseDir, "data" + (i+1) +
+ MiniDFSCluster.FINALIZED_DIR_NAME + block);
LOG.info("Checking for file " + blockFile);
if (blockFile.exists()) {
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java?rev=820497&r1=820496&r2=820497&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java Wed Sep 30 23:39:30 2009
@@ -19,6 +19,7 @@
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Random;
@@ -169,7 +170,8 @@
cluster.waitActive();
client = DFSClient.createNamenode(conf);
- cluster.injectBlocks(blocksDN);
+ for(int i = 0; i < blocksDN.length; i++)
+ cluster.injectBlocks(i, Arrays.asList(blocksDN[i]));
long totalCapacity = 0L;
for(long capacity:capacities) {
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java?rev=820497&r1=820496&r2=820497&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java Wed Sep 30 23:39:30 2009
@@ -20,6 +20,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Random;
@@ -31,8 +32,12 @@
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
+import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.metrics.util.MBeanUtil;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
@@ -77,11 +82,14 @@
nullCrcFileData[i+2] = nullCrcHeader[i];
}
}
-
- private class BInfo { // information about a single block
+
+ // information about a single block
+ private class BInfo implements ReplicaInPipelineInterface {
Block theBlock;
private boolean finalized = false; // if not finalized => ongoing creation
SimulatedOutputStream oStream = null;
+ private long bytesAcked;
+ private long bytesRcvd;
BInfo(Block b, boolean forWriting) throws IOException {
theBlock = new Block(b);
if (theBlock.getNumBytes() < 0) {
@@ -102,26 +110,27 @@
}
}
- synchronized long getGenerationStamp() {
+ synchronized public long getGenerationStamp() {
return theBlock.getGenerationStamp();
}
synchronized void updateBlock(Block b) {
theBlock.setGenerationStamp(b.getGenerationStamp());
- setlength(b.getNumBytes());
+ setNumBytes(b.getNumBytes());
+ setBytesOnDisk(b.getNumBytes());
}
- synchronized long getlength() {
+ synchronized public long getNumBytes() {
if (!finalized) {
- return oStream.getLength();
+ return bytesRcvd;
} else {
return theBlock.getNumBytes();
}
}
- synchronized void setlength(long length) {
+ synchronized public void setNumBytes(long length) {
if (!finalized) {
- oStream.setLength(length);
+ bytesRcvd = length;
} else {
theBlock.setNumBytes(length);
}
@@ -170,7 +179,20 @@
oStream = null;
return;
}
-
+
+ synchronized void unfinalizeBlock() throws IOException {
+ if (!finalized) {
+ throw new IOException("Unfinalized a block that's not finalized "
+ + theBlock);
+ }
+ finalized = false;
+ oStream = new SimulatedOutputStream();
+ long blockLen = theBlock.getNumBytes();
+ oStream.setLength(blockLen);
+ bytesRcvd = blockLen;
+ bytesAcked = blockLen;
+ }
+
SimulatedInputStream getMetaIStream() {
return new SimulatedInputStream(nullCrcFileData);
}
@@ -178,6 +200,64 @@
synchronized boolean isFinalized() {
return finalized;
}
+
+ @Override
+ synchronized public BlockWriteStreams createStreams() throws IOException {
+ if (finalized) {
+ throw new IOException("Trying to write to a finalized replica "
+ + theBlock);
+ } else {
+ SimulatedOutputStream crcStream = new SimulatedOutputStream();
+ return new BlockWriteStreams(oStream, crcStream);
+ }
+ }
+
+ @Override
+ synchronized public long getBlockId() {
+ return theBlock.getBlockId();
+ }
+
+ @Override
+ synchronized public long getVisibleLength() {
+ return getBytesAcked();
+ }
+
+ @Override
+ public ReplicaState getState() {
+ return null;
+ }
+
+ @Override
+ synchronized public long getBytesAcked() {
+ if (finalized) {
+ return theBlock.getNumBytes();
+ } else {
+ return bytesAcked;
+ }
+ }
+
+ @Override
+ synchronized public void setBytesAcked(long bytesAcked) {
+ if (!finalized) {
+ this.bytesAcked = bytesAcked;
+ }
+ }
+
+ @Override
+ synchronized public long getBytesOnDisk() {
+ if (finalized) {
+ return theBlock.getNumBytes();
+ } else {
+ return oStream.getLength();
+ }
+ }
+
+ @Override
+ synchronized public void setBytesOnDisk(long bytesOnDisk) {
+ if (!finalized) {
+ oStream.setLength(bytesOnDisk);
+ }
+ }
}
static private class SimulatedStorage {
@@ -243,10 +323,12 @@
blockMap = new HashMap<Block,BInfo>();
}
- public synchronized void injectBlocks(Block[] injectBlocks)
+ public synchronized void injectBlocks(Iterable<Block> injectBlocks)
throws IOException {
if (injectBlocks != null) {
+ int numInjectedBlocks = 0;
for (Block b: injectBlocks) { // if any blocks in list is bad, reject list
+ numInjectedBlocks++;
if (b == null) {
throw new NullPointerException("Null blocks in block list");
}
@@ -255,12 +337,12 @@
}
}
HashMap<Block, BInfo> oldBlockMap = blockMap;
- blockMap =
- new HashMap<Block,BInfo>(injectBlocks.length + oldBlockMap.size());
+ blockMap = new HashMap<Block,BInfo>(
+ numInjectedBlocks + oldBlockMap.size());
blockMap.putAll(oldBlockMap);
for (Block b: injectBlocks) {
BInfo binfo = new BInfo(b, false);
- blockMap.put(b, binfo);
+ blockMap.put(binfo.theBlock, binfo);
}
}
}
@@ -280,7 +362,7 @@
}
}
- public synchronized Block[] getBlockReport() {
+ public synchronized BlockListAsLongs getBlockReport() {
Block[] blockTable = new Block[blockMap.size()];
int count = 0;
for (BInfo b : blockMap.values()) {
@@ -291,7 +373,8 @@
if (count != blockTable.length) {
blockTable = Arrays.copyOf(blockTable, count);
}
- return blockTable;
+ return new BlockListAsLongs(
+ new ArrayList<Block>(Arrays.asList(blockTable)), null);
}
public long getCapacity() throws IOException {
@@ -311,7 +394,12 @@
if (binfo == null) {
throw new IOException("Finalizing a non existing block " + b);
}
- return binfo.getlength();
+ return binfo.getNumBytes();
+ }
+
+ @Override
+ public Replica getReplica(long blockId) {
+ return blockMap.get(new Block(blockId));
}
/** {@inheritDoc} */
@@ -322,7 +410,7 @@
return null;
}
b.setGenerationStamp(binfo.getGenerationStamp());
- b.setNumBytes(binfo.getlength());
+ b.setNumBytes(binfo.getNumBytes());
return b;
}
@@ -350,7 +438,7 @@
DataNode.LOG.warn("Invalidate: Missing block");
continue;
}
- storage.free(binfo.getlength());
+ storage.free(binfo.getNumBytes());
blockMap.remove(b);
}
if (error) {
@@ -380,21 +468,89 @@
return getStorageInfo();
}
- public synchronized BlockWriteStreams writeToBlock(Block b,
- boolean isRecovery)
- throws IOException {
+ @Override
+ public synchronized ReplicaInPipelineInterface append(Block b,
+ long newGS, long expectedBlockLen) throws IOException {
+ BInfo binfo = blockMap.get(b);
+ if (binfo == null || !binfo.isFinalized()) {
+ throw new ReplicaNotFoundException("Block " + b
+ + " is not valid, and cannot be appended to.");
+ }
+ binfo.unfinalizeBlock();
+ return binfo;
+ }
+
+ @Override
+ public synchronized ReplicaInPipelineInterface recoverAppend(Block b,
+ long newGS, long expectedBlockLen) throws IOException {
+ BInfo binfo = blockMap.get(b);
+ if (binfo == null) {
+ throw new ReplicaNotFoundException("Block " + b
+ + " is not valid, and cannot be appended to.");
+ }
+ if (binfo.isFinalized()) {
+ binfo.unfinalizeBlock();
+ }
+ blockMap.remove(b);
+ binfo.theBlock.setGenerationStamp(newGS);
+ blockMap.put(binfo.theBlock, binfo);
+ return binfo;
+ }
+
+ @Override
+ public void recoverClose(Block b, long newGS,
+ long expectedBlockLen) throws IOException {
+ BInfo binfo = blockMap.get(b);
+ if (binfo == null) {
+ throw new ReplicaNotFoundException("Block " + b
+ + " is not valid, and cannot be appended to.");
+ }
+ if (!binfo.isFinalized()) {
+ binfo.finalizeBlock(binfo.getNumBytes());
+ }
+ blockMap.remove(b);
+ binfo.theBlock.setGenerationStamp(newGS);
+ blockMap.put(binfo.theBlock, binfo);
+ }
+
+ @Override
+ public synchronized ReplicaInPipelineInterface recoverRbw(Block b,
+ long newGS, long minBytesRcvd, long maxBytesRcvd) throws IOException {
+ BInfo binfo = blockMap.get(b);
+ if ( binfo == null) {
+ throw new ReplicaNotFoundException("Block " + b
+ + " does not exist, and cannot be appended to.");
+ }
+ if (binfo.isFinalized()) {
+ throw new ReplicaAlreadyExistsException("Block " + b
+ + " is valid, and cannot be written to.");
+ }
+ blockMap.remove(b);
+ binfo.theBlock.setGenerationStamp(newGS);
+ blockMap.put(binfo.theBlock, binfo);
+ return binfo;
+ }
+
+ @Override
+ public synchronized ReplicaInPipelineInterface createRbw(Block b)
+ throws IOException {
+ return createTemporary(b);
+ }
+
+ @Override
+ public synchronized ReplicaInPipelineInterface createTemporary(Block b)
+ throws IOException {
if (isValidBlock(b)) {
- throw new BlockAlreadyExistsException("Block " + b +
+ throw new ReplicaAlreadyExistsException("Block " + b +
" is valid, and cannot be written to.");
}
if (isBeingWritten(b)) {
- throw new BlockAlreadyExistsException("Block " + b +
+ throw new ReplicaAlreadyExistsException("Block " + b +
" is being written, and cannot be written to.");
}
- BInfo binfo = new BInfo(b, true);
- blockMap.put(b, binfo);
- SimulatedOutputStream crcStream = new SimulatedOutputStream();
- return new BlockWriteStreams(binfo.oStream, crcStream);
+ BInfo binfo = new BInfo(b, true);
+ blockMap.put(binfo.theBlock, binfo);
+ return binfo;
}
public synchronized InputStream getBlockInputStream(Block b)
@@ -483,7 +639,7 @@
if (binfo == null) {
throw new IOException("No such Block " + b );
}
- return binfo.getlength();
+ return binfo.getNumBytes();
}
public synchronized void setChannelPosition(Block b, BlockWriteStreams stream,
@@ -493,7 +649,7 @@
if (binfo == null) {
throw new IOException("No such Block " + b );
}
- binfo.setlength(dataOffset);
+ binfo.setBytesOnDisk(dataOffset);
}
/**
@@ -659,4 +815,23 @@
public boolean hasEnoughResource() {
return true;
}
+
+ @Override
+ public ReplicaRecoveryInfo initReplicaRecovery(RecoveringBlock rBlock)
+ throws IOException {
+ return new ReplicaRecoveryInfo(rBlock.getBlock(), ReplicaState.FINALIZED);
+ }
+
+ @Override
+ public FinalizedReplica updateReplicaUnderRecovery(Block oldBlock,
+ long recoveryId,
+ long newlength) throws IOException {
+ return new FinalizedReplica(
+ oldBlock.getBlockId(), newlength, recoveryId, null, null);
+ }
+
+ @Override
+ public long getReplicaVisibleLength(Block block) throws IOException {
+ return block.getNumBytes();
+ }
}
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java?rev=820497&r1=820496&r2=820497&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java Wed Sep 30 23:39:30 2009
@@ -31,13 +31,11 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.common.HdfsConstants;
@@ -98,7 +96,7 @@
// fail the volume
// delete/make non-writable one of the directories (failed volume)
data_fail = new File(dataDir, "data3");
- failedDir = new File(data_fail, "current");
+ failedDir = new File(data_fail, MiniDFSCluster.FINALIZED_DIR_NAME);
if (failedDir.exists() &&
//!FileUtil.fullyDelete(failedDir)
!deteteBlocks(failedDir)
@@ -116,8 +114,8 @@
// make sure a block report is sent
DataNode dn = cluster.getDataNodes().get(1); //corresponds to dir data3
- cluster.getNameNode().blockReport(dn.dnRegistration,
- BlockListAsLongs.convertToArrayLongs(cluster.getBlockReport(1)));
+ long[] bReport = dn.getFSDataset().getBlockReport().getBlockListAsLongs();
+ cluster.getNameNode().blockReport(dn.dnRegistration, bReport);
// verify number of blocks and files...
verify(filename, filesize);
@@ -302,7 +300,7 @@
int total = 0;
for(int i=0; i<dn_num; i++) {
for(int j=1; j<=2; j++) {
- File dir = new File(new File(dataDir, "data"+(2*i+j)), "current");
+ File dir = new File(dataDir, "data"+(2*i+j)+MiniDFSCluster.FINALIZED_DIR_NAME);
if(dir == null) {
System.out.println("dir is null for dn=" + i + " and data_dir=" + j);
continue;
Added: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRestart.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRestart.java?rev=820497&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRestart.java (added)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRestart.java Wed Sep 30 23:39:30 2009
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume;
+import org.apache.hadoop.io.IOUtils;
+
+import org.junit.Test;
+import org.junit.Assert;
+
+/** Test if a datanode can correctly upgrade itself */
+public class TestDatanodeRestart {
+ // test finalized replicas persist across DataNode restarts
+ @Test public void testFinalizedReplicas() throws Exception {
+ // bring up a cluster of 3
+ Configuration conf = new Configuration();
+ conf.setLong("dfs.block.size", 1024L);
+ conf.setInt("dfs.write.packet.size", 512);
+ MiniDFSCluster cluster = new MiniDFSCluster(conf, 3, true, null);
+ cluster.waitActive();
+ FileSystem fs = cluster.getFileSystem();
+ try {
+ // test finalized replicas
+ final String TopDir = "/test";
+ DFSTestUtil util = new DFSTestUtil("TestCrcCorruption", 2, 3, 8*1024);
+ util.createFiles(fs, TopDir, (short)3);
+ util.waitReplication(fs, TopDir, (short)3);
+ util.checkFiles(fs, TopDir);
+ cluster.restartDataNodes();
+ cluster.waitActive();
+ util.checkFiles(fs, TopDir);
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
+ // test rbw replicas persist across DataNode restarts
+ public void testRbwReplicas() throws IOException {
+ Configuration conf = new Configuration();
+ conf.setLong("dfs.block.size", 1024L);
+ conf.setInt("dfs.write.packet.size", 512);
+ conf.setBoolean("dfs.support.append", true);
+ MiniDFSCluster cluster = new MiniDFSCluster(conf, 2, true, null);
+ cluster.waitActive();
+ try {
+ testRbwReplicas(cluster, false);
+ testRbwReplicas(cluster, true);
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
+ private void testRbwReplicas(MiniDFSCluster cluster, boolean isCorrupt)
+ throws IOException {
+ FSDataOutputStream out = null;
+ FileSystem fs = cluster.getFileSystem();
+ final Path src = new Path("/test.txt");
+ try {
+ final int fileLen = 515;
+ // create some rbw replicas on disk
+ byte[] writeBuf = new byte[fileLen];
+ new Random().nextBytes(writeBuf);
+ out = fs.create(src);
+ out.write(writeBuf);
+ out.sync();
+ DataNode dn = cluster.getDataNodes().get(0);
+ for (FSVolume volume : ((FSDataset)dn.data).volumes.volumes) {
+ File currentDir = volume.getDir().getParentFile();
+ File rbwDir = new File(currentDir, "rbw");
+ for (File file : rbwDir.listFiles()) {
+ if (isCorrupt && Block.isBlockFilename(file)) {
+ new RandomAccessFile(file, "rw").setLength(fileLen-1); // corrupt
+ }
+ }
+ }
+ cluster.restartDataNodes();
+ cluster.waitActive();
+ dn = cluster.getDataNodes().get(0);
+
+ // check volumeMap: one rwr replica
+ ReplicasMap replicas = ((FSDataset)(dn.data)).volumeMap;
+ Assert.assertEquals(1, replicas.size());
+ ReplicaInfo replica = replicas.replicas().iterator().next();
+ Assert.assertEquals(ReplicaState.RWR, replica.getState());
+ if (isCorrupt) {
+ Assert.assertEquals((fileLen-1)/512*512, replica.getNumBytes());
+ } else {
+ Assert.assertEquals(fileLen, replica.getNumBytes());
+ }
+ dn.data.invalidate(new Block[]{replica});
+ } finally {
+ IOUtils.closeStream(out);
+ if (fs.exists(src)) {
+ fs.delete(src, false);
+ }
+ fs.close();
+ }
+ }
+
+ // test recovering unlinked tmp replicas
+ @Test public void testRecoverReplicas() throws IOException {
+ Configuration conf = new Configuration();
+ conf.setLong("dfs.block.size", 1024L);
+ conf.setInt("dfs.write.packet.size", 512);
+ conf.setBoolean("dfs.support.append", true);
+ MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
+ cluster.waitActive();
+ try {
+ FileSystem fs = cluster.getFileSystem();
+ for (int i=0; i<4; i++) {
+ Path fileName = new Path("/test"+i);
+ DFSTestUtil.createFile(fs, fileName, 1, (short)1, 0L);
+ DFSTestUtil.waitReplication(fs, fileName, (short)1);
+ }
+ DataNode dn = cluster.getDataNodes().get(0);
+ Iterator<ReplicaInfo> replicasItor =
+ ((FSDataset)dn.data).volumeMap.replicas().iterator();
+ ReplicaInfo replica = replicasItor.next();
+ createUnlinkTmpFile(replica, true, true); // rename block file
+ createUnlinkTmpFile(replica, false, true); // rename meta file
+ replica = replicasItor.next();
+ createUnlinkTmpFile(replica, true, false); // copy block file
+ createUnlinkTmpFile(replica, false, false); // copy meta file
+ replica = replicasItor.next();
+ createUnlinkTmpFile(replica, true, true); // rename block file
+ createUnlinkTmpFile(replica, false, false); // copy meta file
+
+ cluster.restartDataNodes();
+ cluster.waitActive();
+ dn = cluster.getDataNodes().get(0);
+
+ // check volumeMap: 4 finalized replica
+ Collection<ReplicaInfo> replicas =
+ ((FSDataset)(dn.data)).volumeMap.replicas();
+ Assert.assertEquals(4, replicas.size());
+ replicasItor = replicas.iterator();
+ while (replicasItor.hasNext()) {
+ Assert.assertEquals(ReplicaState.FINALIZED,
+ replicasItor.next().getState());
+ }
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
+ private static void createUnlinkTmpFile(ReplicaInfo replicaInfo,
+ boolean changeBlockFile,
+ boolean isRename) throws IOException {
+ File src;
+ if (changeBlockFile) {
+ src = replicaInfo.getBlockFile();
+ } else {
+ src = replicaInfo.getMetaFile();
+ }
+ File dst = FSDataset.getUnlinkTmpFile(src);
+ if (isRename) {
+ src.renameTo(dst);
+ } else {
+ FileInputStream in = new FileInputStream(src);
+ try {
+ FileOutputStream out = new FileOutputStream(dst);
+ try {
+ IOUtils.copyBytes(in, out, 1);
+ } finally {
+ out.close();
+ }
+ } finally {
+ in.close();
+ }
+ }
+ }
+}