You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by wa...@apache.org on 2014/08/01 20:56:26 UTC

svn commit: r1615191 - in /hadoop/common/branches/branch-2/hadoop-hdfs-project: ./ hadoop-hdfs/ hadoop-hdfs/CHANGES.txt hadoop-hdfs/src/main/java/ hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java

Author: wang
Date: Fri Aug  1 18:56:25 2014
New Revision: 1615191

URL: http://svn.apache.org/r1615191
Log:
HDFS-6788. Improve synchronization in BPOfferService with read write lock. Contributed by Yongjun Zhang.

Modified:
    hadoop/common/branches/branch-2/hadoop-hdfs-project/   (props changed)
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/   (props changed)
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/   (props changed)
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java

Propchange: hadoop/common/branches/branch-2/hadoop-hdfs-project/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project:r1615190

Propchange: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs:r1615190

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1615191&r1=1615190&r2=1615191&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Fri Aug  1 18:56:25 2014
@@ -97,6 +97,9 @@ Release 2.6.0 - UNRELEASED
     HDFS-6802. Some tests in TestDFSClientFailover are missing @Test
     annotation. (Akira Ajisaka via wang)
 
+    HDFS-6788. Improve synchronization in BPOfferService with read write lock.
+    (Yongjun Zhang via wang)
+
   OPTIMIZATIONS
 
     HDFS-6690. Deduplicate xattr names in memory. (wang)

Propchange: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java:r1615190

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java?rev=1615191&r1=1615190&r2=1615191&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java Fri Aug  1 18:56:25 2014
@@ -21,6 +21,7 @@ import com.google.common.annotations.Vis
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
+
 import org.apache.commons.logging.Log;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
@@ -38,6 +39,8 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 /**
  * One instance per block-pool/namespace on the DN, which handles the
@@ -91,6 +94,28 @@ class BPOfferService {
    */
   private long lastActiveClaimTxId = -1;
 
+  private final ReentrantReadWriteLock mReadWriteLock =
+      new ReentrantReadWriteLock();
+  private final Lock mReadLock  = mReadWriteLock.readLock();
+  private final Lock mWriteLock = mReadWriteLock.writeLock();
+
+  // utility methods to acquire and release read lock and write lock
+  void readLock() {
+    mReadLock.lock();
+  }
+
+  void readUnlock() {
+    mReadLock.unlock();
+  }
+
+  void writeLock() {
+    mWriteLock.lock();
+  }
+
+  void writeUnlock() {
+    mWriteLock.unlock();
+  }
+
   BPOfferService(List<InetSocketAddress> nnAddrs, DataNode dn) {
     Preconditions.checkArgument(!nnAddrs.isEmpty(),
         "Must pass at least one NN.");
@@ -135,14 +160,19 @@ class BPOfferService {
     }
     return false;
   }
-  
-  synchronized String getBlockPoolId() {
-    if (bpNSInfo != null) {
-      return bpNSInfo.getBlockPoolID();
-    } else {
-      LOG.warn("Block pool ID needed, but service not yet registered with NN",
-          new Exception("trace"));
-      return null;
+
+  String getBlockPoolId() {
+    readLock();
+    try {
+      if (bpNSInfo != null) {
+        return bpNSInfo.getBlockPoolID();
+      } else {
+        LOG.warn("Block pool ID needed, but service not yet registered with NN",
+            new Exception("trace"));
+        return null;
+      }
+    } finally {
+      readUnlock();
     }
   }
 
@@ -150,27 +180,37 @@ class BPOfferService {
     return getNamespaceInfo() != null;
   }
 
-  synchronized NamespaceInfo getNamespaceInfo() {
-    return bpNSInfo;
+  NamespaceInfo getNamespaceInfo() {
+    readLock();
+    try {
+      return bpNSInfo;
+    } finally {
+      readUnlock();
+    }
   }
 
   @Override
-  public synchronized String toString() {
-    if (bpNSInfo == null) {
-      // If we haven't yet connected to our NN, we don't yet know our
-      // own block pool ID.
-      // If _none_ of the block pools have connected yet, we don't even
-      // know the DatanodeID ID of this DN.
-      String datanodeUuid = dn.getDatanodeUuid();
+  public String toString() {
+    readLock();
+    try {
+      if (bpNSInfo == null) {
+        // If we haven't yet connected to our NN, we don't yet know our
+        // own block pool ID.
+        // If _none_ of the block pools have connected yet, we don't even
+        // know the DatanodeID ID of this DN.
+        String datanodeUuid = dn.getDatanodeUuid();
 
-      if (datanodeUuid == null || datanodeUuid.isEmpty()) {
-        datanodeUuid = "unassigned";
+        if (datanodeUuid == null || datanodeUuid.isEmpty()) {
+          datanodeUuid = "unassigned";
+        }
+        return "Block pool <registering> (Datanode Uuid " + datanodeUuid + ")";
+      } else {
+        return "Block pool " + getBlockPoolId() +
+            " (Datanode Uuid " + dn.getDatanodeUuid() +
+            ")";
       }
-      return "Block pool <registering> (Datanode Uuid " + datanodeUuid + ")";
-    } else {
-      return "Block pool " + getBlockPoolId() +
-          " (Datanode Uuid " + dn.getDatanodeUuid() +
-          ")";
+    } finally {
+      readUnlock();
     }
   }
   
@@ -266,32 +306,37 @@ class BPOfferService {
    * verifies that this namespace matches (eg to prevent a misconfiguration
    * where a StandbyNode from a different cluster is specified)
    */
-  synchronized void verifyAndSetNamespaceInfo(NamespaceInfo nsInfo) throws IOException {
-    if (this.bpNSInfo == null) {
-      this.bpNSInfo = nsInfo;
-      boolean success = false;
-
-      // Now that we know the namespace ID, etc, we can pass this to the DN.
-      // The DN can now initialize its local storage if we are the
-      // first BP to handshake, etc.
-      try {
-        dn.initBlockPool(this);
-        success = true;
-      } finally {
-        if (!success) {
-          // The datanode failed to initialize the BP. We need to reset
-          // the namespace info so that other BPService actors still have
-          // a chance to set it, and re-initialize the datanode.
-          this.bpNSInfo = null;
+  void verifyAndSetNamespaceInfo(NamespaceInfo nsInfo) throws IOException {
+    writeLock();
+    try {
+      if (this.bpNSInfo == null) {
+        this.bpNSInfo = nsInfo;
+        boolean success = false;
+
+        // Now that we know the namespace ID, etc, we can pass this to the DN.
+        // The DN can now initialize its local storage if we are the
+        // first BP to handshake, etc.
+        try {
+          dn.initBlockPool(this);
+          success = true;
+        } finally {
+          if (!success) {
+            // The datanode failed to initialize the BP. We need to reset
+            // the namespace info so that other BPService actors still have
+            // a chance to set it, and re-initialize the datanode.
+            this.bpNSInfo = null;
+          }
         }
+      } else {
+        checkNSEquality(bpNSInfo.getBlockPoolID(), nsInfo.getBlockPoolID(),
+            "Blockpool ID");
+        checkNSEquality(bpNSInfo.getNamespaceID(), nsInfo.getNamespaceID(),
+            "Namespace ID");
+        checkNSEquality(bpNSInfo.getClusterID(), nsInfo.getClusterID(),
+            "Cluster ID");
       }
-    } else {
-      checkNSEquality(bpNSInfo.getBlockPoolID(), nsInfo.getBlockPoolID(),
-          "Blockpool ID");
-      checkNSEquality(bpNSInfo.getNamespaceID(), nsInfo.getNamespaceID(),
-          "Namespace ID");
-      checkNSEquality(bpNSInfo.getClusterID(), nsInfo.getClusterID(),
-          "Cluster ID");
+    } finally {
+      writeUnlock();
     }
   }
 
@@ -300,22 +345,27 @@ class BPOfferService {
    * NN, it calls this function to verify that the NN it connected to
    * is consistent with other NNs serving the block-pool.
    */
-  synchronized void registrationSucceeded(BPServiceActor bpServiceActor,
+  void registrationSucceeded(BPServiceActor bpServiceActor,
       DatanodeRegistration reg) throws IOException {
-    if (bpRegistration != null) {
-      checkNSEquality(bpRegistration.getStorageInfo().getNamespaceID(),
-          reg.getStorageInfo().getNamespaceID(), "namespace ID");
-      checkNSEquality(bpRegistration.getStorageInfo().getClusterID(),
-          reg.getStorageInfo().getClusterID(), "cluster ID");
-    } else {
-      bpRegistration = reg;
-    }
-    
-    dn.bpRegistrationSucceeded(bpRegistration, getBlockPoolId());
-    // Add the initial block token secret keys to the DN's secret manager.
-    if (dn.isBlockTokenEnabled) {
-      dn.blockPoolTokenSecretManager.addKeys(getBlockPoolId(),
-          reg.getExportedKeys());
+    writeLock();
+    try {
+      if (bpRegistration != null) {
+        checkNSEquality(bpRegistration.getStorageInfo().getNamespaceID(),
+            reg.getStorageInfo().getNamespaceID(), "namespace ID");
+        checkNSEquality(bpRegistration.getStorageInfo().getClusterID(),
+            reg.getStorageInfo().getClusterID(), "cluster ID");
+      } else {
+        bpRegistration = reg;
+      }
+
+      dn.bpRegistrationSucceeded(bpRegistration, getBlockPoolId());
+      // Add the initial block token secret keys to the DN's secret manager.
+      if (dn.isBlockTokenEnabled) {
+        dn.blockPoolTokenSecretManager.addKeys(getBlockPoolId(),
+            reg.getExportedKeys());
+      }
+    } finally {
+      writeUnlock();
     }
   }
 
@@ -333,25 +383,35 @@ class BPOfferService {
     }
   }
 
-  synchronized DatanodeRegistration createRegistration() {
-    Preconditions.checkState(bpNSInfo != null,
-        "getRegistration() can only be called after initial handshake");
-    return dn.createBPRegistration(bpNSInfo);
+  DatanodeRegistration createRegistration() {
+    writeLock();
+    try {
+      Preconditions.checkState(bpNSInfo != null,
+          "getRegistration() can only be called after initial handshake");
+      return dn.createBPRegistration(bpNSInfo);
+    } finally {
+      writeUnlock();
+    }
   }
 
   /**
    * Called when an actor shuts down. If this is the last actor
    * to shut down, shuts down the whole blockpool in the DN.
    */
-  synchronized void shutdownActor(BPServiceActor actor) {
-    if (bpServiceToActive == actor) {
-      bpServiceToActive = null;
-    }
+  void shutdownActor(BPServiceActor actor) {
+    writeLock();
+    try {
+      if (bpServiceToActive == actor) {
+        bpServiceToActive = null;
+      }
 
-    bpServices.remove(actor);
+      bpServices.remove(actor);
 
-    if (bpServices.isEmpty()) {
-      dn.shutdownBlockPool(this);
+      if (bpServices.isEmpty()) {
+        dn.shutdownBlockPool(this);
+      }
+    } finally {
+      writeUnlock();
     }
   }
   
@@ -393,11 +453,16 @@ class BPOfferService {
    * @return a proxy to the active NN, or null if the BPOS has not
    * acknowledged any NN as active yet.
    */
-  synchronized DatanodeProtocolClientSideTranslatorPB getActiveNN() {
-    if (bpServiceToActive != null) {
-      return bpServiceToActive.bpNamenode;
-    } else {
-      return null;
+  DatanodeProtocolClientSideTranslatorPB getActiveNN() {
+    readLock();
+    try {
+      if (bpServiceToActive != null) {
+        return bpServiceToActive.bpNamenode;
+      } else {
+        return null;
+      }
+    } finally {
+      readUnlock();
     }
   }
 
@@ -425,45 +490,50 @@ class BPOfferService {
    * @param actor the actor which received the heartbeat
    * @param nnHaState the HA-related heartbeat contents
    */
-  synchronized void updateActorStatesFromHeartbeat(
+  void updateActorStatesFromHeartbeat(
       BPServiceActor actor,
       NNHAStatusHeartbeat nnHaState) {
-    final long txid = nnHaState.getTxId();
-    
-    final boolean nnClaimsActive =
-      nnHaState.getState() == HAServiceState.ACTIVE;
-    final boolean bposThinksActive = bpServiceToActive == actor;
-    final boolean isMoreRecentClaim = txid > lastActiveClaimTxId; 
-    
-    if (nnClaimsActive && !bposThinksActive) {
-      LOG.info("Namenode " + actor + " trying to claim ACTIVE state with " +
-          "txid=" + txid);
-      if (!isMoreRecentClaim) {
-        // Split-brain scenario - an NN is trying to claim active
-        // state when a different NN has already claimed it with a higher
-        // txid.
-        LOG.warn("NN " + actor + " tried to claim ACTIVE state at txid=" +
-            txid + " but there was already a more recent claim at txid=" +
-            lastActiveClaimTxId);
-        return;
-      } else {
-        if (bpServiceToActive == null) {
-          LOG.info("Acknowledging ACTIVE Namenode " + actor);
+    writeLock();
+    try {
+      final long txid = nnHaState.getTxId();
+
+      final boolean nnClaimsActive =
+          nnHaState.getState() == HAServiceState.ACTIVE;
+      final boolean bposThinksActive = bpServiceToActive == actor;
+      final boolean isMoreRecentClaim = txid > lastActiveClaimTxId;
+
+      if (nnClaimsActive && !bposThinksActive) {
+        LOG.info("Namenode " + actor + " trying to claim ACTIVE state with " +
+            "txid=" + txid);
+        if (!isMoreRecentClaim) {
+          // Split-brain scenario - an NN is trying to claim active
+          // state when a different NN has already claimed it with a higher
+          // txid.
+          LOG.warn("NN " + actor + " tried to claim ACTIVE state at txid=" +
+              txid + " but there was already a more recent claim at txid=" +
+              lastActiveClaimTxId);
+          return;
         } else {
-          LOG.info("Namenode " + actor + " taking over ACTIVE state from " +
-              bpServiceToActive + " at higher txid=" + txid);
+          if (bpServiceToActive == null) {
+            LOG.info("Acknowledging ACTIVE Namenode " + actor);
+          } else {
+            LOG.info("Namenode " + actor + " taking over ACTIVE state from " +
+                bpServiceToActive + " at higher txid=" + txid);
+          }
+          bpServiceToActive = actor;
         }
-        bpServiceToActive = actor;
+      } else if (!nnClaimsActive && bposThinksActive) {
+        LOG.info("Namenode " + actor + " relinquishing ACTIVE state with " +
+            "txid=" + nnHaState.getTxId());
+        bpServiceToActive = null;
       }
-    } else if (!nnClaimsActive && bposThinksActive) {
-      LOG.info("Namenode " + actor + " relinquishing ACTIVE state with " +
-          "txid=" + nnHaState.getTxId());
-      bpServiceToActive = null;
-    }
-    
-    if (bpServiceToActive == actor) {
-      assert txid >= lastActiveClaimTxId;
-      lastActiveClaimTxId = txid;
+
+      if (bpServiceToActive == actor) {
+        assert txid >= lastActiveClaimTxId;
+        lastActiveClaimTxId = txid;
+      }
+    } finally {
+      writeUnlock();
     }
   }
 
@@ -534,11 +604,14 @@ class BPOfferService {
       actor.reRegister();
       return true;
     }
-    synchronized (this) {
+    writeLock();
+    try {
     if (actor == bpServiceToActive) {
       return processCommandFromActive(cmd, actor);
     } else {
       return processCommandFromStandby(cmd, actor);
+    } finally {
+      writeUnlock();
     }
   }
   }