You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by ha...@apache.org on 2010/03/23 07:00:12 UTC
svn commit: r926469 - in /hadoop/hdfs/trunk: ./
src/java/org/apache/hadoop/hdfs/server/datanode/
src/test/hdfs/org/apache/hadoop/hdfs/
src/test/unit/org/apache/hadoop/hdfs/server/datanode/
Author: hairong
Date: Tue Mar 23 06:00:11 2010
New Revision: 926469
URL: http://svn.apache.org/viewvc?rev=926469&view=rev
Log:
HDFS-520. Create new tests for block recovery. Contributed by Hairong.
Added:
hadoop/hdfs/trunk/src/test/unit/org/apache/hadoop/hdfs/server/datanode/
hadoop/hdfs/trunk/src/test/unit/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
Modified:
hadoop/hdfs/trunk/CHANGES.txt
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery2.java
Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=926469&r1=926468&r2=926469&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Tue Mar 23 06:00:11 2010
@@ -544,6 +544,8 @@ Release 0.21.0 - Unreleased
HDFS-127. Reset failure count in DFSClient for each block acquiring
operation. (Igor Bolotin via szetszwo)
+ HDFS-520. Create new tests for block recovery. (hairong)
+
BUG FIXES
HDFS-76. Better error message to users when commands fail because of
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=926469&r1=926468&r2=926469&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Tue Mar 23 06:00:11 2010
@@ -228,6 +228,19 @@ public class DataNode extends Configured
*/
DataNode(final Configuration conf,
final AbstractList<File> dataDirs) throws IOException {
+ this(conf, dataDirs, (DatanodeProtocol)RPC.waitForProxy(DatanodeProtocol.class,
+ DatanodeProtocol.versionID,
+ NameNode.getAddress(conf),
+ conf));
+ }
+
+ /**
+ * Create the DataNode given a configuration, an array of dataDirs,
+ * and a namenode proxy
+ */
+ DataNode(final Configuration conf,
+ final AbstractList<File> dataDirs,
+ final DatanodeProtocol namenode) throws IOException {
super(conf);
UserGroupInformation.setConfiguration(conf);
@@ -238,7 +251,7 @@ public class DataNode extends Configured
DataNode.setDataNode(this);
try {
- startDataNode(conf, dataDirs);
+ startDataNode(conf, dataDirs, namenode);
} catch (IOException ie) {
shutdown();
throw ie;
@@ -256,7 +269,8 @@ public class DataNode extends Configured
* @throws IOException
*/
void startDataNode(Configuration conf,
- AbstractList<File> dataDirs
+ AbstractList<File> dataDirs,
+ DatanodeProtocol namenode
) throws IOException {
// use configured nameserver & interface to get local hostname
if (conf.get(DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY) != null) {
@@ -287,11 +301,8 @@ public class DataNode extends Configured
this.dnRegistration = new DatanodeRegistration(machineName + ":" + tmpPort);
// connect to name node
- this.namenode = (DatanodeProtocol)
- RPC.waitForProxy(DatanodeProtocol.class,
- DatanodeProtocol.versionID,
- nameNodeAddr,
- conf);
+ this.namenode = namenode;
+
// get version and id info from the name-node
NamespaceInfo nsInfo = handshake();
StartupOption startOpt = getStartupOption(conf);
@@ -1589,7 +1600,7 @@ public class DataNode extends Configured
}
/** A convenient class used in block recovery */
- private static class BlockRecord {
+ static class BlockRecord {
final DatanodeID id;
final InterDatanodeProtocol datanode;
final ReplicaRecoveryInfo rInfo;
@@ -1650,7 +1661,7 @@ public class DataNode extends Configured
}
/** Block synchronization */
- private void syncBlock(RecoveringBlock rBlock,
+ void syncBlock(RecoveringBlock rBlock,
List<BlockRecord> syncList) throws IOException {
Block block = rBlock.getBlock();
long recoveryId = rBlock.getNewGenerationStamp();
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery2.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery2.java?rev=926469&r1=926468&r2=926469&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery2.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery2.java Tue Mar 23 06:00:11 2010
@@ -17,146 +17,225 @@
*/
package org.apache.hadoop.hdfs;
+import static org.junit.Assert.*;
+
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
+import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.log4j.Level;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
-public class TestLeaseRecovery2 extends junit.framework.TestCase {
+public class TestLeaseRecovery2 {
{
((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
}
- static final long BLOCK_SIZE = 1024;
- static final int FILE_SIZE = 1024*16;
+ static final private long BLOCK_SIZE = 1024;
+ static final private int FILE_SIZE = (int)BLOCK_SIZE*2;
static final short REPLICATION_NUM = (short)3;
static byte[] buffer = new byte[FILE_SIZE];
static private String fakeUsername = "fakeUser1";
static private String fakeGroup = "supergroup";
- public void testBlockSynchronization() throws Exception {
- final long softLease = 1000;
- final long hardLease = 60 * 60 *1000;
- final short repl = 3;
- final Configuration conf = new HdfsConfiguration();
- final int bufferSize = conf.getInt("io.file.buffer.size", 4096);
+ static private MiniDFSCluster cluster;
+ static private DistributedFileSystem dfs;
+ final static private Configuration conf = new HdfsConfiguration();
+ final static private int BUF_SIZE = conf.getInt("io.file.buffer.size", 4096);
+
+ final static private long SHORT_LEASE_PERIOD = 1000L;
+ final static private long LONG_LEASE_PERIOD = 60*60*SHORT_LEASE_PERIOD;
+
+ /** start a dfs cluster
+ *
+ * @throws IOException
+ */
+ @BeforeClass
+ public static void startUp() throws IOException {
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
conf.setInt("dfs.heartbeat.interval", 1);
- // conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 16);
- // create fake mapping user to group and set it to the conf
- // NOTE. this must be done at the beginning, before first call to mapping
- // functions
+ cluster = new MiniDFSCluster(conf, 5, true, null);
+ cluster.waitActive();
+ dfs = (DistributedFileSystem)cluster.getFileSystem();
+ }
+
+ /**
+ * stop the cluster
+ * @throws IOException
+ */
+ @AfterClass
+ public static void tearDown() throws IOException {
+ IOUtils.closeStream(dfs);
+ if (cluster != null) {cluster.shutdown();}
+ }
+
+ /**
+ * This test makes the client does not renew its lease and also
+ * set the hard lease expiration period to be short 1s. Thus triggering
+ * lease expiration to happen while the client is still alive.
+ *
+ * The test makes sure that the lease recovery completes and the client
+ * fails if it continues to write to the file.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testHardLeaseRecovery() throws Exception {
+ //create a file
+ String filestr = "/hardLeaseRecovery";
+ AppendTestUtil.LOG.info("filestr=" + filestr);
+ Path filepath = new Path(filestr);
+ FSDataOutputStream stm = dfs.create(filepath, true,
+ BUF_SIZE, REPLICATION_NUM, BLOCK_SIZE);
+ assertTrue(dfs.dfs.exists(filestr));
+
+ // write bytes into the file.
+ int size = AppendTestUtil.nextInt(FILE_SIZE);
+ AppendTestUtil.LOG.info("size=" + size);
+ stm.write(buffer, 0, size);
+
+ // hflush file
+ AppendTestUtil.LOG.info("hflush");
+ stm.hflush();
+
+ // kill the lease renewal thread
+ AppendTestUtil.LOG.info("leasechecker.interruptAndJoin()");
+ dfs.dfs.leasechecker.interruptAndJoin();
+
+ // set the hard limit to be 1 second
+ cluster.setLeasePeriod(LONG_LEASE_PERIOD, SHORT_LEASE_PERIOD);
+
+ // wait for lease recovery to complete
+ LocatedBlocks locatedBlocks;
+ do {
+ Thread.sleep(SHORT_LEASE_PERIOD);
+ locatedBlocks = DFSClient.callGetBlockLocations(dfs.dfs.namenode,
+ filestr, 0L, size);
+ } while (locatedBlocks.isUnderConstruction());
+ assertEquals(size, locatedBlocks.getFileLength());
+
+ // make sure that the writer thread gets killed
+ try {
+ stm.write('b');
+ stm.close();
+ fail("Writer thread should have been killed");
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ // verify data
+ AppendTestUtil.LOG.info(
+ "File size is good. Now validating sizes from datanodes...");
+ AppendTestUtil.checkFullFile(dfs, filepath, size, buffer, filestr);
+ }
+
+ /**
+ * This test makes the client does not renew its lease and also
+ * set the soft lease expiration period to be short 1s. Thus triggering
+ * soft lease expiration to happen immediately by having another client
+ * trying to create the same file.
+ *
+ * The test makes sure that the lease recovery completes.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testSoftLeaseRecovery() throws Exception {
Map<String, String []> u2g_map = new HashMap<String, String []>(1);
u2g_map.put(fakeUsername, new String[] {fakeGroup});
DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2g_map);
- MiniDFSCluster cluster = null;
- DistributedFileSystem dfs = null;
- byte[] actual = new byte[FILE_SIZE];
-
- try {
- cluster = new MiniDFSCluster(conf, 5, true, null);
- cluster.waitActive();
-
- //create a file
- dfs = (DistributedFileSystem)cluster.getFileSystem();
- // create a random file name
- String filestr = "/foo" + AppendTestUtil.nextInt();
- System.out.println("filestr=" + filestr);
- Path filepath = new Path(filestr);
- FSDataOutputStream stm = dfs.create(filepath, true,
- bufferSize, repl, BLOCK_SIZE);
- assertTrue(dfs.dfs.exists(filestr));
-
- // write random number of bytes into it.
- int size = AppendTestUtil.nextInt(FILE_SIZE);
- System.out.println("size=" + size);
- stm.write(buffer, 0, size);
-
- // hflush file
- AppendTestUtil.LOG.info("hflush");
- stm.hflush();
- AppendTestUtil.LOG.info("leasechecker.interruptAndJoin()");
- dfs.dfs.leasechecker.interruptAndJoin();
-
- // set the soft limit to be 1 second so that the
- // namenode triggers lease recovery on next attempt to write-for-open.
- cluster.setLeasePeriod(softLease, hardLease);
-
- // try to re-open the file before closing the previous handle. This
- // should fail but will trigger lease recovery.
- {
- UserGroupInformation ugi =
- UserGroupInformation.createUserForTesting(fakeUsername,
- new String [] { fakeGroup});
-
- FileSystem dfs2 = DFSTestUtil.getFileSystemAs(ugi, conf);
-
- boolean done = false;
- for(int i = 0; i < 10 && !done; i++) {
- AppendTestUtil.LOG.info("i=" + i);
- try {
- dfs2.create(filepath, false, bufferSize, repl, BLOCK_SIZE);
- fail("Creation of an existing file should never succeed.");
- } catch (IOException ioe) {
- final String message = ioe.getMessage();
- if (message.contains("file exists")) {
- AppendTestUtil.LOG.info("done", ioe);
- done = true;
- }
- else if (message.contains(AlreadyBeingCreatedException.class.getSimpleName())) {
- AppendTestUtil.LOG.info("GOOD! got " + message);
- }
- else {
- AppendTestUtil.LOG.warn("UNEXPECTED IOException", ioe);
- }
+ //create a file
+ // create a random file name
+ String filestr = "/foo" + AppendTestUtil.nextInt();
+ AppendTestUtil.LOG.info("filestr=" + filestr);
+ Path filepath = new Path(filestr);
+ FSDataOutputStream stm = dfs.create(filepath, true,
+ BUF_SIZE, REPLICATION_NUM, BLOCK_SIZE);
+ assertTrue(dfs.dfs.exists(filestr));
+
+ // write random number of bytes into it.
+ int size = AppendTestUtil.nextInt(FILE_SIZE);
+ AppendTestUtil.LOG.info("size=" + size);
+ stm.write(buffer, 0, size);
+
+ // hflush file
+ AppendTestUtil.LOG.info("hflush");
+ stm.hflush();
+ AppendTestUtil.LOG.info("leasechecker.interruptAndJoin()");
+ dfs.dfs.leasechecker.interruptAndJoin();
+
+ // set the soft limit to be 1 second so that the
+ // namenode triggers lease recovery on next attempt to write-for-open.
+ cluster.setLeasePeriod(SHORT_LEASE_PERIOD, LONG_LEASE_PERIOD);
+
+ // try to re-open the file before closing the previous handle. This
+ // should fail but will trigger lease recovery.
+ {
+ UserGroupInformation ugi =
+ UserGroupInformation.createUserForTesting(fakeUsername,
+ new String [] { fakeGroup});
+
+ FileSystem dfs2 = DFSTestUtil.getFileSystemAs(ugi, conf);
+
+ boolean done = false;
+ for(int i = 0; i < 10 && !done; i++) {
+ AppendTestUtil.LOG.info("i=" + i);
+ try {
+ dfs2.create(filepath, false, BUF_SIZE, REPLICATION_NUM, BLOCK_SIZE);
+ fail("Creation of an existing file should never succeed.");
+ } catch (IOException ioe) {
+ final String message = ioe.getMessage();
+ if (message.contains("file exists")) {
+ AppendTestUtil.LOG.info("done", ioe);
+ done = true;
}
-
- if (!done) {
- AppendTestUtil.LOG.info("sleep " + 5000 + "ms");
- try {Thread.sleep(5000);} catch (InterruptedException e) {}
+ else if (message.contains(
+ AlreadyBeingCreatedException.class.getSimpleName())) {
+ AppendTestUtil.LOG.info("GOOD! got " + message);
+ }
+ else {
+ AppendTestUtil.LOG.warn("UNEXPECTED IOException", ioe);
}
}
- assertTrue(done);
- }
- AppendTestUtil.LOG.info("Lease for file " + filepath + " is recovered. "
- + "Validating its contents now...");
-
- // verify that file-size matches
- long fileSize = dfs.getFileStatus(filepath).getLen();
- assertTrue("File should be " + size + " bytes, but is actually " +
- " found to be " + fileSize + " bytes", fileSize == size);
-
- // verify that there is enough data to read.
- System.out.println("File size is good. Now validating sizes from datanodes...");
- FSDataInputStream stmin = dfs.open(filepath);
- stmin.readFully(0, actual, 0, size);
- stmin.close();
- }
- finally {
- try {
- if(dfs != null) dfs.close();
- if (cluster != null) {cluster.shutdown();}
- } catch (Exception e) {
- // ignore
+ if (!done) {
+ AppendTestUtil.LOG.info("sleep " + 5000 + "ms");
+ try {Thread.sleep(5000);} catch (InterruptedException e) {}
+ }
}
+ assertTrue(done);
}
+
+ AppendTestUtil.LOG.info("Lease for file " + filepath + " is recovered. "
+ + "Validating its contents now...");
+
+ // verify that file-size matches
+ long fileSize = dfs.getFileStatus(filepath).getLen();
+ assertTrue("File should be " + size + " bytes, but is actually " +
+ " found to be " + fileSize + " bytes", fileSize == size);
+
+ // verify data
+ AppendTestUtil.LOG.info("File size is good. " +
+ "Now validating data and sizes from datanodes...");
+ AppendTestUtil.checkFullFile(dfs, filepath, size, buffer, filestr);
}
}
Added: hadoop/hdfs/trunk/src/test/unit/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/unit/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java?rev=926469&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/test/unit/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java (added)
+++ hadoop/hdfs/trunk/src/test/unit/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java Tue Mar 23 06:00:11 2010
@@ -0,0 +1,316 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNode.BlockRecord;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.log4j.Level;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import static org.mockito.Mockito.*;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+
+/**
+ * This tests if sync all replicas in block recovery works correctly
+ */
+public class TestBlockRecovery {
+ private static final Log LOG = LogFactory.getLog(TestBlockRecovery.class);
+ private static final String DATA_DIR =
+ MiniDFSCluster.getBaseDirectory() + "data";
+ private DataNode dn;
+ private Configuration conf;
+ private final static long RECOVERY_ID = 3000L;
+ private final static long BLOCK_ID = 1000L;
+ private final static long GEN_STAMP = 2000L;
+ private final static long BLOCK_LEN = 3000L;
+ private final static long REPLICA_LEN1 = 6000L;
+ private final static long REPLICA_LEN2 = 5000L;
+ private final static Block block = new Block(BLOCK_ID, BLOCK_LEN, GEN_STAMP);
+
+ static {
+ ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
+ ((Log4JLogger)LOG).getLogger().setLevel(Level.ALL);
+ }
+
+ /**
+ * Starts an instance of DataNode
+ * @throws IOException
+ */
+ @Before
+ public void startUp() throws IOException {
+ conf = new HdfsConfiguration();
+ conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, DATA_DIR);
+ FileSystem.setDefaultUri(conf, "hdfs://localhost:5020");
+ ArrayList<File> dirs = new ArrayList<File>();
+ File dataDir = new File(DATA_DIR);
+ FileUtil.fullyDelete(dataDir);
+ dataDir.mkdirs();
+ dirs.add(dataDir);
+ DatanodeProtocol namenode = mock(DatanodeProtocol.class);
+ when(namenode.versionRequest()).thenReturn(new NamespaceInfo(1, 1L, 1));
+ when(namenode.sendHeartbeat(any(DatanodeRegistration.class), anyLong(),
+ anyLong(), anyLong(), anyInt(), anyInt())).thenReturn(
+ new DatanodeCommand[0]);
+ dn = new DataNode(conf, dirs, namenode);
+ }
+
+ /**
+ * Cleans the resources and closes the instance of datanode
+ * @throws IOException if an error occurred
+ */
+ @After
+ public void tearDown() throws IOException {
+ if (dn != null) {
+ try {
+ dn.shutdown();
+ } catch(Exception e) {
+ LOG.error("Cannot close: ", e);
+ } finally {
+ File dir = new File(DATA_DIR);
+ if (dir.exists())
+ Assert.assertTrue(
+ "Cannot delete data-node dirs", FileUtil.fullyDelete(dir));
+ }
+ }
+ }
+
+ /** Sync two replicas */
+ private void testSyncReplicas(ReplicaRecoveryInfo replica1,
+ ReplicaRecoveryInfo replica2,
+ InterDatanodeProtocol dn1,
+ InterDatanodeProtocol dn2) throws IOException {
+
+ DatanodeInfo[] locs = new DatanodeInfo[]{
+ mock(DatanodeInfo.class), mock(DatanodeInfo.class)};
+ RecoveringBlock rBlock = new RecoveringBlock(block,
+ locs, RECOVERY_ID);
+ ArrayList<BlockRecord> syncList = new ArrayList<BlockRecord>(2);
+ BlockRecord record1 = new BlockRecord(
+ new DatanodeID("xx", "yy", 44, 55), dn1, replica1);
+ BlockRecord record2 = new BlockRecord(
+ new DatanodeID("aa", "bb", 11, 22), dn2, replica2);
+ syncList.add(record1);
+ syncList.add(record2);
+ dn.syncBlock(rBlock, syncList);
+ }
+
+ /**
+ * BlockRecovery_02.8.
+ * Two replicas are in Finalized state
+ * @throws IOException in case of an error
+ */
+ @Test
+ public void testFinalizedReplicas () throws IOException {
+ LOG.debug("Running " + GenericTestUtils.getMethodName());
+ ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID,
+ REPLICA_LEN1, GEN_STAMP-1, ReplicaState.FINALIZED);
+ ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID,
+ REPLICA_LEN1, GEN_STAMP-2, ReplicaState.FINALIZED);
+
+ InterDatanodeProtocol dn1 = mock(InterDatanodeProtocol.class);
+ InterDatanodeProtocol dn2 = mock(InterDatanodeProtocol.class);
+
+ testSyncReplicas(replica1, replica2, dn1, dn2);
+ verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, REPLICA_LEN1);
+ verify(dn2).updateReplicaUnderRecovery(block, RECOVERY_ID, REPLICA_LEN1);
+
+ // two finalized replicas have different length
+ replica1 = new ReplicaRecoveryInfo(BLOCK_ID,
+ REPLICA_LEN1, GEN_STAMP-1, ReplicaState.FINALIZED);
+ replica2 = new ReplicaRecoveryInfo(BLOCK_ID,
+ REPLICA_LEN2, GEN_STAMP-2, ReplicaState.FINALIZED);
+
+ try {
+ testSyncReplicas(replica1, replica2, dn1, dn2);
+ Assert.fail("Two finalized replicas should not have different lengthes!");
+ } catch (IOException e) {
+ Assert.assertTrue(e.getMessage().startsWith(
+ "Inconsistent size of finalized replicas. "));
+ }
+ }
+
+ /**
+ * BlockRecovery_02.9.
+ * One replica is Finalized and another is RBW.
+ * @throws IOException in case of an error
+ */
+ @Test
+ public void testFinalizedRbwReplicas() throws IOException {
+ LOG.debug("Running " + GenericTestUtils.getMethodName());
+
+ // rbw and finalized replicas have the same length
+ ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID,
+ REPLICA_LEN1, GEN_STAMP-1, ReplicaState.FINALIZED);
+ ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID,
+ REPLICA_LEN1, GEN_STAMP-2, ReplicaState.RBW);
+
+ InterDatanodeProtocol dn1 = mock(InterDatanodeProtocol.class);
+ InterDatanodeProtocol dn2 = mock(InterDatanodeProtocol.class);
+
+ testSyncReplicas(replica1, replica2, dn1, dn2);
+ verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, REPLICA_LEN1);
+ verify(dn2).updateReplicaUnderRecovery(block, RECOVERY_ID, REPLICA_LEN1);
+
+ // rbw replica has a different length from the finalized one
+ replica1 = new ReplicaRecoveryInfo(BLOCK_ID,
+ REPLICA_LEN1, GEN_STAMP-1, ReplicaState.FINALIZED);
+ replica2 = new ReplicaRecoveryInfo(BLOCK_ID,
+ REPLICA_LEN2, GEN_STAMP-2, ReplicaState.RBW);
+
+ dn1 = mock(InterDatanodeProtocol.class);
+ dn2 = mock(InterDatanodeProtocol.class);
+
+ testSyncReplicas(replica1, replica2, dn1, dn2);
+ verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, REPLICA_LEN1);
+ verify(dn2, never()).updateReplicaUnderRecovery(
+ block, RECOVERY_ID, REPLICA_LEN1);
+ }
+
+ /**
+ * BlockRecovery_02.10.
+ * One replica is Finalized and another is RWR.
+ * @throws IOException in case of an error
+ */
+ @Test
+ public void testFinalizedRwrReplicas() throws IOException {
+ LOG.debug("Running " + GenericTestUtils.getMethodName());
+
+ // rbw and finalized replicas have the same length
+ ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID,
+ REPLICA_LEN1, GEN_STAMP-1, ReplicaState.FINALIZED);
+ ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID,
+ REPLICA_LEN1, GEN_STAMP-2, ReplicaState.RWR);
+
+ InterDatanodeProtocol dn1 = mock(InterDatanodeProtocol.class);
+ InterDatanodeProtocol dn2 = mock(InterDatanodeProtocol.class);
+
+ testSyncReplicas(replica1, replica2, dn1, dn2);
+ verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, REPLICA_LEN1);
+ verify(dn2, never()).updateReplicaUnderRecovery(
+ block, RECOVERY_ID, REPLICA_LEN1);
+
+ // rbw replica has a different length from the finalized one
+ replica1 = new ReplicaRecoveryInfo(BLOCK_ID,
+ REPLICA_LEN1, GEN_STAMP-1, ReplicaState.FINALIZED);
+ replica2 = new ReplicaRecoveryInfo(BLOCK_ID,
+ REPLICA_LEN2, GEN_STAMP-2, ReplicaState.RBW);
+
+ dn1 = mock(InterDatanodeProtocol.class);
+ dn2 = mock(InterDatanodeProtocol.class);
+
+ testSyncReplicas(replica1, replica2, dn1, dn2);
+ verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, REPLICA_LEN1);
+ verify(dn2, never()).updateReplicaUnderRecovery(
+ block, RECOVERY_ID, REPLICA_LEN1);
+ }
+
+ /**
+ * BlockRecovery_02.11.
+ * Two replicas are RBW.
+ * @throws IOException in case of an error
+ */
+ @Test
+ public void testRBWReplicas() throws IOException {
+ LOG.debug("Running " + GenericTestUtils.getMethodName());
+ ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID,
+ REPLICA_LEN1, GEN_STAMP-1, ReplicaState.RBW);
+ ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID,
+ REPLICA_LEN2, GEN_STAMP-2, ReplicaState.RBW);
+
+ InterDatanodeProtocol dn1 = mock(InterDatanodeProtocol.class);
+ InterDatanodeProtocol dn2 = mock(InterDatanodeProtocol.class);
+
+ testSyncReplicas(replica1, replica2, dn1, dn2);
+ long minLen = Math.min(REPLICA_LEN1, REPLICA_LEN2);
+ verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, minLen);
+ verify(dn2).updateReplicaUnderRecovery(block, RECOVERY_ID, minLen);
+ }
+
+ /**
+ * BlockRecovery_02.12.
+ * One replica is RBW and another is RWR.
+ * @throws IOException in case of an error
+ */
+ @Test
+ public void testRBW_RWRReplicas() throws IOException {
+ LOG.debug("Running " + GenericTestUtils.getMethodName());
+ ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID,
+ REPLICA_LEN1, GEN_STAMP-1, ReplicaState.RBW);
+ ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID,
+ REPLICA_LEN1, GEN_STAMP-2, ReplicaState.RWR);
+
+ InterDatanodeProtocol dn1 = mock(InterDatanodeProtocol.class);
+ InterDatanodeProtocol dn2 = mock(InterDatanodeProtocol.class);
+
+ testSyncReplicas(replica1, replica2, dn1, dn2);
+ verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, REPLICA_LEN1);
+ verify(dn2, never()).updateReplicaUnderRecovery(
+ block, RECOVERY_ID, REPLICA_LEN1);
+ }
+
+ /**
+ * BlockRecovery_02.13.
+ * Two replicas are RWR.
+ * @throws IOException in case of an error
+ */
+ @Test
+ public void testRWRReplicas() throws IOException {
+ LOG.debug("Running " + GenericTestUtils.getMethodName());
+ ReplicaRecoveryInfo replica1 = new ReplicaRecoveryInfo(BLOCK_ID,
+ REPLICA_LEN1, GEN_STAMP-1, ReplicaState.RWR);
+ ReplicaRecoveryInfo replica2 = new ReplicaRecoveryInfo(BLOCK_ID,
+ REPLICA_LEN2, GEN_STAMP-2, ReplicaState.RWR);
+
+ InterDatanodeProtocol dn1 = mock(InterDatanodeProtocol.class);
+ InterDatanodeProtocol dn2 = mock(InterDatanodeProtocol.class);
+
+ testSyncReplicas(replica1, replica2, dn1, dn2);
+
+ long minLen = Math.min(REPLICA_LEN1, REPLICA_LEN2);
+ verify(dn1).updateReplicaUnderRecovery(block, RECOVERY_ID, minLen);
+ verify(dn2).updateReplicaUnderRecovery(block, RECOVERY_ID, minLen);
+ }
+}