You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by wa...@apache.org on 2014/08/01 20:56:07 UTC
svn commit: r1615190 - in
/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: CHANGES.txt
src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
Author: wang
Date: Fri Aug 1 18:56:06 2014
New Revision: 1615190
URL: http://svn.apache.org/r1615190
Log:
HDFS-6788. Improve synchronization in BPOfferService with read write lock. Contributed by Yongjun Zhang.
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1615190&r1=1615189&r2=1615190&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Fri Aug 1 18:56:06 2014
@@ -349,6 +349,9 @@ Release 2.6.0 - UNRELEASED
HDFS-6802. Some tests in TestDFSClientFailover are missing @Test
annotation. (Akira Ajisaka via wang)
+ HDFS-6788. Improve synchronization in BPOfferService with read write lock.
+ (Yongjun Zhang via wang)
+
OPTIMIZATIONS
HDFS-6690. Deduplicate xattr names in memory. (wang)
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java?rev=1615190&r1=1615189&r2=1615190&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java Fri Aug 1 18:56:06 2014
@@ -21,6 +21,7 @@ import com.google.common.annotations.Vis
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
+
import org.apache.commons.logging.Log;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
@@ -38,6 +39,8 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* One instance per block-pool/namespace on the DN, which handles the
@@ -91,6 +94,28 @@ class BPOfferService {
*/
private long lastActiveClaimTxId = -1;
+ private final ReentrantReadWriteLock mReadWriteLock =
+ new ReentrantReadWriteLock();
+ private final Lock mReadLock = mReadWriteLock.readLock();
+ private final Lock mWriteLock = mReadWriteLock.writeLock();
+
+ // utility methods to acquire and release read lock and write lock
+ void readLock() {
+ mReadLock.lock();
+ }
+
+ void readUnlock() {
+ mReadLock.unlock();
+ }
+
+ void writeLock() {
+ mWriteLock.lock();
+ }
+
+ void writeUnlock() {
+ mWriteLock.unlock();
+ }
+
BPOfferService(List<InetSocketAddress> nnAddrs, DataNode dn) {
Preconditions.checkArgument(!nnAddrs.isEmpty(),
"Must pass at least one NN.");
@@ -135,14 +160,19 @@ class BPOfferService {
}
return false;
}
-
- synchronized String getBlockPoolId() {
- if (bpNSInfo != null) {
- return bpNSInfo.getBlockPoolID();
- } else {
- LOG.warn("Block pool ID needed, but service not yet registered with NN",
- new Exception("trace"));
- return null;
+
+ String getBlockPoolId() {
+ readLock();
+ try {
+ if (bpNSInfo != null) {
+ return bpNSInfo.getBlockPoolID();
+ } else {
+ LOG.warn("Block pool ID needed, but service not yet registered with NN",
+ new Exception("trace"));
+ return null;
+ }
+ } finally {
+ readUnlock();
}
}
@@ -150,27 +180,37 @@ class BPOfferService {
return getNamespaceInfo() != null;
}
- synchronized NamespaceInfo getNamespaceInfo() {
- return bpNSInfo;
+ NamespaceInfo getNamespaceInfo() {
+ readLock();
+ try {
+ return bpNSInfo;
+ } finally {
+ readUnlock();
+ }
}
@Override
- public synchronized String toString() {
- if (bpNSInfo == null) {
- // If we haven't yet connected to our NN, we don't yet know our
- // own block pool ID.
- // If _none_ of the block pools have connected yet, we don't even
- // know the DatanodeID ID of this DN.
- String datanodeUuid = dn.getDatanodeUuid();
+ public String toString() {
+ readLock();
+ try {
+ if (bpNSInfo == null) {
+ // If we haven't yet connected to our NN, we don't yet know our
+ // own block pool ID.
+ // If _none_ of the block pools have connected yet, we don't even
+ // know the DatanodeID ID of this DN.
+ String datanodeUuid = dn.getDatanodeUuid();
- if (datanodeUuid == null || datanodeUuid.isEmpty()) {
- datanodeUuid = "unassigned";
+ if (datanodeUuid == null || datanodeUuid.isEmpty()) {
+ datanodeUuid = "unassigned";
+ }
+ return "Block pool <registering> (Datanode Uuid " + datanodeUuid + ")";
+ } else {
+ return "Block pool " + getBlockPoolId() +
+ " (Datanode Uuid " + dn.getDatanodeUuid() +
+ ")";
}
- return "Block pool <registering> (Datanode Uuid " + datanodeUuid + ")";
- } else {
- return "Block pool " + getBlockPoolId() +
- " (Datanode Uuid " + dn.getDatanodeUuid() +
- ")";
+ } finally {
+ readUnlock();
}
}
@@ -266,32 +306,37 @@ class BPOfferService {
* verifies that this namespace matches (eg to prevent a misconfiguration
* where a StandbyNode from a different cluster is specified)
*/
- synchronized void verifyAndSetNamespaceInfo(NamespaceInfo nsInfo) throws IOException {
- if (this.bpNSInfo == null) {
- this.bpNSInfo = nsInfo;
- boolean success = false;
-
- // Now that we know the namespace ID, etc, we can pass this to the DN.
- // The DN can now initialize its local storage if we are the
- // first BP to handshake, etc.
- try {
- dn.initBlockPool(this);
- success = true;
- } finally {
- if (!success) {
- // The datanode failed to initialize the BP. We need to reset
- // the namespace info so that other BPService actors still have
- // a chance to set it, and re-initialize the datanode.
- this.bpNSInfo = null;
+ void verifyAndSetNamespaceInfo(NamespaceInfo nsInfo) throws IOException {
+ writeLock();
+ try {
+ if (this.bpNSInfo == null) {
+ this.bpNSInfo = nsInfo;
+ boolean success = false;
+
+ // Now that we know the namespace ID, etc, we can pass this to the DN.
+ // The DN can now initialize its local storage if we are the
+ // first BP to handshake, etc.
+ try {
+ dn.initBlockPool(this);
+ success = true;
+ } finally {
+ if (!success) {
+ // The datanode failed to initialize the BP. We need to reset
+ // the namespace info so that other BPService actors still have
+ // a chance to set it, and re-initialize the datanode.
+ this.bpNSInfo = null;
+ }
}
+ } else {
+ checkNSEquality(bpNSInfo.getBlockPoolID(), nsInfo.getBlockPoolID(),
+ "Blockpool ID");
+ checkNSEquality(bpNSInfo.getNamespaceID(), nsInfo.getNamespaceID(),
+ "Namespace ID");
+ checkNSEquality(bpNSInfo.getClusterID(), nsInfo.getClusterID(),
+ "Cluster ID");
}
- } else {
- checkNSEquality(bpNSInfo.getBlockPoolID(), nsInfo.getBlockPoolID(),
- "Blockpool ID");
- checkNSEquality(bpNSInfo.getNamespaceID(), nsInfo.getNamespaceID(),
- "Namespace ID");
- checkNSEquality(bpNSInfo.getClusterID(), nsInfo.getClusterID(),
- "Cluster ID");
+ } finally {
+ writeUnlock();
}
}
@@ -300,22 +345,27 @@ class BPOfferService {
* NN, it calls this function to verify that the NN it connected to
* is consistent with other NNs serving the block-pool.
*/
- synchronized void registrationSucceeded(BPServiceActor bpServiceActor,
+ void registrationSucceeded(BPServiceActor bpServiceActor,
DatanodeRegistration reg) throws IOException {
- if (bpRegistration != null) {
- checkNSEquality(bpRegistration.getStorageInfo().getNamespaceID(),
- reg.getStorageInfo().getNamespaceID(), "namespace ID");
- checkNSEquality(bpRegistration.getStorageInfo().getClusterID(),
- reg.getStorageInfo().getClusterID(), "cluster ID");
- } else {
- bpRegistration = reg;
- }
-
- dn.bpRegistrationSucceeded(bpRegistration, getBlockPoolId());
- // Add the initial block token secret keys to the DN's secret manager.
- if (dn.isBlockTokenEnabled) {
- dn.blockPoolTokenSecretManager.addKeys(getBlockPoolId(),
- reg.getExportedKeys());
+ writeLock();
+ try {
+ if (bpRegistration != null) {
+ checkNSEquality(bpRegistration.getStorageInfo().getNamespaceID(),
+ reg.getStorageInfo().getNamespaceID(), "namespace ID");
+ checkNSEquality(bpRegistration.getStorageInfo().getClusterID(),
+ reg.getStorageInfo().getClusterID(), "cluster ID");
+ } else {
+ bpRegistration = reg;
+ }
+
+ dn.bpRegistrationSucceeded(bpRegistration, getBlockPoolId());
+ // Add the initial block token secret keys to the DN's secret manager.
+ if (dn.isBlockTokenEnabled) {
+ dn.blockPoolTokenSecretManager.addKeys(getBlockPoolId(),
+ reg.getExportedKeys());
+ }
+ } finally {
+ writeUnlock();
}
}
@@ -333,25 +383,35 @@ class BPOfferService {
}
}
- synchronized DatanodeRegistration createRegistration() {
- Preconditions.checkState(bpNSInfo != null,
- "getRegistration() can only be called after initial handshake");
- return dn.createBPRegistration(bpNSInfo);
+ DatanodeRegistration createRegistration() {
+ writeLock();
+ try {
+ Preconditions.checkState(bpNSInfo != null,
+ "getRegistration() can only be called after initial handshake");
+ return dn.createBPRegistration(bpNSInfo);
+ } finally {
+ writeUnlock();
+ }
}
/**
* Called when an actor shuts down. If this is the last actor
* to shut down, shuts down the whole blockpool in the DN.
*/
- synchronized void shutdownActor(BPServiceActor actor) {
- if (bpServiceToActive == actor) {
- bpServiceToActive = null;
- }
+ void shutdownActor(BPServiceActor actor) {
+ writeLock();
+ try {
+ if (bpServiceToActive == actor) {
+ bpServiceToActive = null;
+ }
- bpServices.remove(actor);
+ bpServices.remove(actor);
- if (bpServices.isEmpty()) {
- dn.shutdownBlockPool(this);
+ if (bpServices.isEmpty()) {
+ dn.shutdownBlockPool(this);
+ }
+ } finally {
+ writeUnlock();
}
}
@@ -392,11 +452,16 @@ class BPOfferService {
* @return a proxy to the active NN, or null if the BPOS has not
* acknowledged any NN as active yet.
*/
- synchronized DatanodeProtocolClientSideTranslatorPB getActiveNN() {
- if (bpServiceToActive != null) {
- return bpServiceToActive.bpNamenode;
- } else {
- return null;
+ DatanodeProtocolClientSideTranslatorPB getActiveNN() {
+ readLock();
+ try {
+ if (bpServiceToActive != null) {
+ return bpServiceToActive.bpNamenode;
+ } else {
+ return null;
+ }
+ } finally {
+ readUnlock();
}
}
@@ -424,45 +489,50 @@ class BPOfferService {
* @param actor the actor which received the heartbeat
* @param nnHaState the HA-related heartbeat contents
*/
- synchronized void updateActorStatesFromHeartbeat(
+ void updateActorStatesFromHeartbeat(
BPServiceActor actor,
NNHAStatusHeartbeat nnHaState) {
- final long txid = nnHaState.getTxId();
-
- final boolean nnClaimsActive =
- nnHaState.getState() == HAServiceState.ACTIVE;
- final boolean bposThinksActive = bpServiceToActive == actor;
- final boolean isMoreRecentClaim = txid > lastActiveClaimTxId;
-
- if (nnClaimsActive && !bposThinksActive) {
- LOG.info("Namenode " + actor + " trying to claim ACTIVE state with " +
- "txid=" + txid);
- if (!isMoreRecentClaim) {
- // Split-brain scenario - an NN is trying to claim active
- // state when a different NN has already claimed it with a higher
- // txid.
- LOG.warn("NN " + actor + " tried to claim ACTIVE state at txid=" +
- txid + " but there was already a more recent claim at txid=" +
- lastActiveClaimTxId);
- return;
- } else {
- if (bpServiceToActive == null) {
- LOG.info("Acknowledging ACTIVE Namenode " + actor);
+ writeLock();
+ try {
+ final long txid = nnHaState.getTxId();
+
+ final boolean nnClaimsActive =
+ nnHaState.getState() == HAServiceState.ACTIVE;
+ final boolean bposThinksActive = bpServiceToActive == actor;
+ final boolean isMoreRecentClaim = txid > lastActiveClaimTxId;
+
+ if (nnClaimsActive && !bposThinksActive) {
+ LOG.info("Namenode " + actor + " trying to claim ACTIVE state with " +
+ "txid=" + txid);
+ if (!isMoreRecentClaim) {
+ // Split-brain scenario - an NN is trying to claim active
+ // state when a different NN has already claimed it with a higher
+ // txid.
+ LOG.warn("NN " + actor + " tried to claim ACTIVE state at txid=" +
+ txid + " but there was already a more recent claim at txid=" +
+ lastActiveClaimTxId);
+ return;
} else {
- LOG.info("Namenode " + actor + " taking over ACTIVE state from " +
- bpServiceToActive + " at higher txid=" + txid);
+ if (bpServiceToActive == null) {
+ LOG.info("Acknowledging ACTIVE Namenode " + actor);
+ } else {
+ LOG.info("Namenode " + actor + " taking over ACTIVE state from " +
+ bpServiceToActive + " at higher txid=" + txid);
+ }
+ bpServiceToActive = actor;
}
- bpServiceToActive = actor;
+ } else if (!nnClaimsActive && bposThinksActive) {
+ LOG.info("Namenode " + actor + " relinquishing ACTIVE state with " +
+ "txid=" + nnHaState.getTxId());
+ bpServiceToActive = null;
}
- } else if (!nnClaimsActive && bposThinksActive) {
- LOG.info("Namenode " + actor + " relinquishing ACTIVE state with " +
- "txid=" + nnHaState.getTxId());
- bpServiceToActive = null;
- }
-
- if (bpServiceToActive == actor) {
- assert txid >= lastActiveClaimTxId;
- lastActiveClaimTxId = txid;
+
+ if (bpServiceToActive == actor) {
+ assert txid >= lastActiveClaimTxId;
+ lastActiveClaimTxId = txid;
+ }
+ } finally {
+ writeUnlock();
}
}
@@ -533,12 +603,15 @@ class BPOfferService {
actor.reRegister();
return true;
}
- synchronized (this) {
+ writeLock();
+ try {
if (actor == bpServiceToActive) {
return processCommandFromActive(cmd, actor);
} else {
return processCommandFromStandby(cmd, actor);
}
+ } finally {
+ writeUnlock();
}
}