You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ki...@apache.org on 2015/02/12 16:17:28 UTC
hadoop git commit: HDFS-7704. DN heartbeat to Active NN may be
blocked and expire if connection to Standby NN continues to time out.
Contributed by Rushabh Shah. (cherry picked from commit
38262779bbf38a427bad6d044e91165567f1d206)
Repository: hadoop
Updated Branches:
refs/heads/branch-2 dc14f2772 -> 39594301e
HDFS-7704. DN heartbeat to Active NN may be blocked and expire if connection to Standby NN continues to time out. Contributed by Rushabh Shah.
(cherry picked from commit 38262779bbf38a427bad6d044e91165567f1d206)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/39594301
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/39594301
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/39594301
Branch: refs/heads/branch-2
Commit: 39594301e1ba772954059aa488ce7f14e203487f
Parents: dc14f27
Author: Kihwal Lee <ki...@apache.org>
Authored: Thu Feb 12 09:17:10 2015 -0600
Committer: Kihwal Lee <ki...@apache.org>
Committed: Thu Feb 12 09:17:10 2015 -0600
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +
.../hdfs/server/datanode/BPOfferService.java | 8 +-
.../hdfs/server/datanode/BPServiceActor.java | 59 +++----
.../server/datanode/TestBPOfferService.java | 157 +++++++++++++++++++
4 files changed, 196 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/39594301/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index f57f074..54ad016 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -626,6 +626,9 @@ Release 2.7.0 - UNRELEASED
HDFS-7771. fuse_dfs should permit FILE: on the front of KRB5CCNAME
(cmccabe)
+ HDFS-7704. DN heartbeat to Active NN may be blocked and expire if
+ connection to Standby NN continues to time out (Rushabh Shah via kihwal)
+
Release 2.6.1 - UNRELEASED
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/39594301/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
index 1455a8a..ca00941 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
@@ -218,7 +218,9 @@ class BPOfferService {
String storageUuid, StorageType storageType) {
checkBlock(block);
for (BPServiceActor actor : bpServices) {
- actor.reportBadBlocks(block, storageUuid, storageType);
+ ReportBadBlockAction rbbAction = new ReportBadBlockAction
+ (block, storageUuid, storageType);
+ actor.bpThreadEnqueue(rbbAction);
}
}
@@ -415,7 +417,9 @@ class BPOfferService {
*/
void trySendErrorReport(int errCode, String errMsg) {
for (BPServiceActor actor : bpServices) {
- actor.trySendErrorReport(errCode, errMsg);
+ ErrorReportAction errorReportAction = new ErrorReportAction
+ (errCode, errMsg);
+ actor.bpThreadEnqueue(errorReportAction);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/39594301/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
index c344027..6529a6b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
@@ -25,6 +25,7 @@ import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -34,7 +35,6 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.hdfs.client.BlockReportOptions;
import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -120,6 +120,8 @@ class BPServiceActor implements Runnable {
private final DNConf dnConf;
private DatanodeRegistration bpRegistration;
+ final LinkedList<BPServiceActorAction> bpThreadQueue
+ = new LinkedList<BPServiceActorAction>();
BPServiceActor(InetSocketAddress nnAddr, BPOfferService bpos) {
this.bpos = bpos;
@@ -254,26 +256,6 @@ class BPServiceActor implements Runnable {
resetBlockReportTime = true; // reset future BRs for randomness
}
- void reportBadBlocks(ExtendedBlock block,
- String storageUuid, StorageType storageType) {
- if (bpRegistration == null) {
- return;
- }
- DatanodeInfo[] dnArr = { new DatanodeInfo(bpRegistration) };
- String[] uuids = { storageUuid };
- StorageType[] types = { storageType };
- LocatedBlock[] blocks = { new LocatedBlock(block, dnArr, uuids, types) };
-
- try {
- bpNamenode.reportBadBlocks(blocks);
- } catch (IOException e){
- /* One common reason is that NameNode could be in safe mode.
- * Should we keep on retrying in that case?
- */
- LOG.warn("Failed to report bad block " + block + " to namenode : "
- + " Exception", e);
- }
- }
/**
* Report received blocks and delete hints to the Namenode for each
@@ -777,6 +759,7 @@ class BPServiceActor implements Runnable {
} catch (IOException e) {
LOG.warn("IOException in offerService", e);
}
+ processQueueMessages();
} // while (shouldRun())
} // offerService
@@ -915,14 +898,6 @@ class BPServiceActor implements Runnable {
return true;
}
- void trySendErrorReport(int errCode, String errMsg) {
- try {
- bpNamenode.errorReport(bpRegistration, errCode, errMsg);
- } catch(IOException e) {
- LOG.warn("Error reporting an error to NameNode " + nnAddr,
- e);
- }
- }
/**
* Report a bad block from another DN in this cluster.
@@ -1024,4 +999,30 @@ class BPServiceActor implements Runnable {
}
}
}
+
+ public void bpThreadEnqueue(BPServiceActorAction action) {
+ synchronized (bpThreadQueue) {
+ if (!bpThreadQueue.contains(action)) {
+ bpThreadQueue.add(action);
+ }
+ }
+ }
+
+ private void processQueueMessages() {
+ LinkedList<BPServiceActorAction> duplicateQueue;
+ synchronized (bpThreadQueue) {
+ duplicateQueue = new LinkedList<BPServiceActorAction>(bpThreadQueue);
+ bpThreadQueue.clear();
+ }
+ while (!duplicateQueue.isEmpty()) {
+ BPServiceActorAction actionItem = duplicateQueue.remove();
+ try {
+ actionItem.reportTo(bpNamenode, bpRegistration);
+ } catch (BPServiceActorActionException baae) {
+ LOG.warn(baae.getMessage() + nnAddr , baae);
+ // Adding it back to the queue if not present
+ bpThreadEnqueue(actionItem);
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/39594301/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
index 7171c49..e21ce38 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
@@ -21,6 +21,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
@@ -37,6 +38,7 @@ import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
@@ -53,6 +55,7 @@ import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.PathUtils;
+import org.apache.hadoop.util.Time;
import org.apache.log4j.Level;
import org.junit.Before;
import org.junit.Test;
@@ -74,6 +77,8 @@ public class TestBPOfferService {
private static final ExtendedBlock FAKE_BLOCK =
new ExtendedBlock(FAKE_BPID, 12345L);
private static final File TEST_BUILD_DATA = PathUtils.getTestDir(TestBPOfferService.class);
+ private long firstCallTime = 0;
+ private long secondCallTime = 0;
static {
((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
@@ -458,4 +463,156 @@ public class TestBPOfferService {
return captor.getValue()[0].getBlocks();
}
+ private void setTimeForSynchronousBPOSCalls() {
+ if (firstCallTime == 0) {
+ firstCallTime = Time.now();
+ } else {
+ secondCallTime = Time.now();
+ }
+ }
+
+ private class BPOfferServiceSynchronousCallAnswer implements Answer<Void> {
+ private final int nnIdx;
+
+ public BPOfferServiceSynchronousCallAnswer(int nnIdx) {
+ this.nnIdx = nnIdx;
+ }
+
+ // For active namenode we will record the processTime and for standby
+ // namenode we will sleep for 5 seconds (This will simulate the situation
+ // where the standby namenode is down ) .
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ if (nnIdx == 0) {
+ setTimeForSynchronousBPOSCalls();
+ } else {
+ Thread.sleep(5000);
+ }
+ return null;
+ }
+ }
+
+ /**
+ * This test case test the {@link BPOfferService#reportBadBlocks} method
+ * such that if call to standby namenode times out then that should not
+ * affect the active namenode heartbeat processing since this function
+ * are in writeLock.
+ * @throws Exception
+ */
+ @Test
+ public void testReportBadBlockWhenStandbyNNTimesOut() throws Exception {
+ BPOfferService bpos = setupBPOSForNNs(mockNN1, mockNN2);
+ bpos.start();
+ try {
+ waitForInitialization(bpos);
+ // Should start with neither NN as active.
+ assertNull(bpos.getActiveNN());
+ // Have NN1 claim active at txid 1
+ mockHaStatuses[0] = new NNHAStatusHeartbeat(HAServiceState.ACTIVE, 1);
+ bpos.triggerHeartbeatForTests();
+ // Now mockNN1 is acting like active namenode and mockNN2 as Standby
+ assertSame(mockNN1, bpos.getActiveNN());
+ Mockito.doAnswer(new BPOfferServiceSynchronousCallAnswer(0))
+ .when(mockNN1).reportBadBlocks(Mockito.any(LocatedBlock[].class));
+ Mockito.doAnswer(new BPOfferServiceSynchronousCallAnswer(1))
+ .when(mockNN2).reportBadBlocks(Mockito.any(LocatedBlock[].class));
+ bpos.reportBadBlocks(FAKE_BLOCK, mockFSDataset.getVolume(FAKE_BLOCK)
+ .getStorageID(), mockFSDataset.getVolume(FAKE_BLOCK)
+ .getStorageType());
+ bpos.reportBadBlocks(FAKE_BLOCK, mockFSDataset.getVolume(FAKE_BLOCK)
+ .getStorageID(), mockFSDataset.getVolume(FAKE_BLOCK)
+ .getStorageType());
+ Thread.sleep(10000);
+ long difference = secondCallTime - firstCallTime;
+ assertTrue("Active namenode reportBadBlock processing should be "
+ + "independent of standby namenode reportBadBlock processing ",
+ difference < 5000);
+ } finally {
+ bpos.stop();
+ }
+ }
+
+ /**
+ * This test case test the {@link BPOfferService#trySendErrorReport} method
+ * such that if call to standby namenode times out then that should not
+ * affect the active namenode heartbeat processing since this function
+ * are in writeLock.
+ * @throws Exception
+ */
+ @Test
+ public void testTrySendErrorReportWhenStandbyNNTimesOut() throws Exception {
+ BPOfferService bpos = setupBPOSForNNs(mockNN1, mockNN2);
+ bpos.start();
+ try {
+ waitForInitialization(bpos);
+ // Should start with neither NN as active.
+ assertNull(bpos.getActiveNN());
+ // Have NN1 claim active at txid 1
+ mockHaStatuses[0] = new NNHAStatusHeartbeat(HAServiceState.ACTIVE, 1);
+ bpos.triggerHeartbeatForTests();
+ // Now mockNN1 is acting like active namenode and mockNN2 as Standby
+ assertSame(mockNN1, bpos.getActiveNN());
+ Mockito.doAnswer(new BPOfferServiceSynchronousCallAnswer(0))
+ .when(mockNN1).errorReport(Mockito.any(DatanodeRegistration.class),
+ Mockito.anyInt(), Mockito.anyString());
+ Mockito.doAnswer(new BPOfferServiceSynchronousCallAnswer(1))
+ .when(mockNN2).errorReport(Mockito.any(DatanodeRegistration.class),
+ Mockito.anyInt(), Mockito.anyString());
+ String errorString = "Can't send invalid block " + FAKE_BLOCK;
+ bpos.trySendErrorReport(DatanodeProtocol.INVALID_BLOCK, errorString);
+ bpos.trySendErrorReport(DatanodeProtocol.INVALID_BLOCK, errorString);
+ Thread.sleep(10000);
+ long difference = secondCallTime - firstCallTime;
+ assertTrue("Active namenode trySendErrorReport processing "
+ + "should be independent of standby namenode trySendErrorReport"
+ + " processing ", difference < 5000);
+ } finally {
+ bpos.stop();
+ }
+ }
+ /**
+ * This test case tests whether the {@BPServiceActor#processQueueMessages}
+ * adds back the error report back to the queue when
+ * {BPServiceActorAction#reportTo} throws an IOException
+ * @throws Exception
+ */
+ @Test
+ public void testTrySendErrorReportWhenNNThrowsIOException()
+ throws Exception {
+ BPOfferService bpos = setupBPOSForNNs(mockNN1, mockNN2);
+ bpos.start();
+ try {
+ waitForInitialization(bpos);
+ // Should start with neither NN as active.
+ assertNull(bpos.getActiveNN());
+ // Have NN1 claim active at txid 1
+ mockHaStatuses[0] = new NNHAStatusHeartbeat(HAServiceState.ACTIVE, 1);
+ bpos.triggerHeartbeatForTests();
+ // Now mockNN1 is acting like active namenode and mockNN2 as Standby
+ assertSame(mockNN1, bpos.getActiveNN());
+ Mockito.doAnswer(new Answer<Void>() {
+ // Throw an IOException when this function is first called which will
+ // in turn add that errorReport back to the bpThreadQueue and let it
+ // process the next time.
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ if (firstCallTime == 0) {
+ firstCallTime = Time.now();
+ throw new IOException();
+ } else {
+ secondCallTime = Time.now();
+ return null;
+ }
+ }
+ }).when(mockNN1).errorReport(Mockito.any(DatanodeRegistration.class),
+ Mockito.anyInt(), Mockito.anyString());
+ String errorString = "Can't send invalid block " + FAKE_BLOCK;
+ bpos.trySendErrorReport(DatanodeProtocol.INVALID_BLOCK, errorString);
+ Thread.sleep(10000);
+ assertTrue("Active namenode didn't add the report back to the queue "
+ + "when errorReport threw IOException", secondCallTime != 0);
+ } finally {
+ bpos.stop();
+ }
+ }
}