You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by br...@apache.org on 2014/08/18 23:26:46 UTC

svn commit: r1618747 - in /hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/server/datanode/ src/test/java/org/apache/hadoop/hdfs/server/datanode/

Author: brandonli
Date: Mon Aug 18 21:26:45 2014
New Revision: 1618747

URL: http://svn.apache.org/r1618747
Log:
HDFS-6569. Merging change r1618742 from trunk

Modified:
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeRollingUpgrade.java

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1618747&r1=1618746&r2=1618747&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Mon Aug 18 21:26:45 2014
@@ -259,6 +259,9 @@ Release 2.6.0 - UNRELEASED
     HDFS-6825. Edit log corruption due to delayed block removal.
     (Yongjun Zhang via wang)
 
+    HDFS-6569. OOB message can't be sent to the client when DataNode shuts down for upgrade
+    (brandonli)
+
 Release 2.5.0 - UNRELEASED
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1618747&r1=1618746&r2=1618747&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Mon Aug 18 21:26:45 2014
@@ -712,7 +712,12 @@ class BlockReceiver implements Closeable
       LOG.warn("Error managing cache for writer of block " + block, t);
     }
   }
-
+  
+  public void sendOOB() throws IOException, InterruptedException {
+    ((PacketResponder) responder.getRunnable()).sendOOBResponse(PipelineAck
+        .getRestartOOBStatus());
+  }
+  
   void receiveBlock(
       DataOutputStream mirrOut, // output to next datanode
       DataInputStream mirrIn,   // input from next datanode
@@ -800,9 +805,7 @@ class BlockReceiver implements Closeable
               // The worst case is not recovering this RBW replica. 
               // Client will fall back to regular pipeline recovery.
             }
-            try {
-              ((PacketResponder) responder.getRunnable()).
-                  sendOOBResponse(PipelineAck.getRestartOOBStatus());
+            try {              
               // Even if the connection is closed after the ack packet is
               // flushed, the client can react to the connection closure 
               // first. Insert a delay to lower the chance of client 
@@ -810,8 +813,6 @@ class BlockReceiver implements Closeable
               Thread.sleep(1000);
             } catch (InterruptedException ie) {
               // It is already going down. Ignore this.
-            } catch (IOException ioe) {
-              LOG.info("Error sending OOB Ack.", ioe);
             }
           }
           responder.interrupt();

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1618747&r1=1618746&r2=1618747&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Mon Aug 18 21:26:45 2014
@@ -273,6 +273,7 @@ public class DataNode extends Configured
   public final static String EMPTY_DEL_HINT = "";
   final AtomicInteger xmitsInProgress = new AtomicInteger();
   Daemon dataXceiverServer = null;
+  DataXceiverServer xserver = null;
   Daemon localDataXceiverServer = null;
   ShortCircuitRegistry shortCircuitRegistry = null;
   ThreadGroup threadGroup = null;
@@ -656,8 +657,8 @@ public class DataNode extends Configured
     streamingAddr = tcpPeerServer.getStreamingAddr();
     LOG.info("Opened streaming server at " + streamingAddr);
     this.threadGroup = new ThreadGroup("dataXceiverServer");
-    this.dataXceiverServer = new Daemon(threadGroup, 
-        new DataXceiverServer(tcpPeerServer, conf, this));
+    xserver = new DataXceiverServer(tcpPeerServer, conf, this);
+    this.dataXceiverServer = new Daemon(threadGroup, xserver);
     this.threadGroup.setDaemon(true); // auto destroy when empty
 
     if (conf.getBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY,
@@ -1145,6 +1146,11 @@ public class DataNode extends Configured
   }
   
   @VisibleForTesting
+  public DataXceiverServer getXferServer() {
+    return xserver;  
+  }
+  
+  @VisibleForTesting
   public int getXferPort() {
     return streamingAddr.getPort();
   }
@@ -1402,6 +1408,7 @@ public class DataNode extends Configured
     // in order to avoid any further acceptance of requests, but the peers
     // for block writes are not closed until the clients are notified.
     if (dataXceiverServer != null) {
+      xserver.sendOOBToPeers();
       ((DataXceiverServer) this.dataXceiverServer.getRunnable()).kill();
       this.dataXceiverServer.interrupt();
     }

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1618747&r1=1618746&r2=1618747&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Mon Aug 18 21:26:45 2014
@@ -103,7 +103,8 @@ class DataXceiver extends Receiver imple
   private long opStartTime; //the start time of receiving an Op
   private final InputStream socketIn;
   private OutputStream socketOut;
-
+  private BlockReceiver blockReceiver = null;
+  
   /**
    * Client Name used in previous operation. Not available on first request
    * on the socket.
@@ -159,6 +160,12 @@ class DataXceiver extends Receiver imple
     return socketOut;
   }
 
+  public void sendOOB() throws IOException, InterruptedException {
+    LOG.info("Sending OOB to peer: " + peer);
+    if(blockReceiver!=null)
+      blockReceiver.sendOOB();
+  }
+  
   /**
    * Read/write data from/to the DataXceiverServer.
    */
@@ -168,7 +175,7 @@ class DataXceiver extends Receiver imple
     Op op = null;
 
     try {
-      dataXceiverServer.addPeer(peer, Thread.currentThread());
+      dataXceiverServer.addPeer(peer, Thread.currentThread(), this);
       peer.setWriteTimeout(datanode.getDnConf().socketWriteTimeout);
       InputStream input = socketIn;
       IOStreamPair saslStreams = datanode.saslServer.receive(peer, socketOut,
@@ -584,7 +591,6 @@ class DataXceiver extends Receiver imple
     DataOutputStream mirrorOut = null;  // stream to next target
     DataInputStream mirrorIn = null;    // reply from next target
     Socket mirrorSock = null;           // socket to next target
-    BlockReceiver blockReceiver = null; // responsible for data handling
     String mirrorNode = null;           // the name:port of next target
     String firstBadLink = "";           // first datanode that failed in connection setup
     Status mirrorInStatus = SUCCESS;
@@ -747,6 +753,7 @@ class DataXceiver extends Receiver imple
       IOUtils.closeStream(replyOut);
       IOUtils.closeSocket(mirrorSock);
       IOUtils.closeStream(blockReceiver);
+      blockReceiver = null;
     }
 
     //update metrics

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java?rev=1618747&r1=1618746&r2=1618747&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java Mon Aug 18 21:26:45 2014
@@ -27,11 +27,11 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.net.Peer;
 import org.apache.hadoop.hdfs.net.PeerServer;
-import org.apache.hadoop.hdfs.server.balancer.Balancer;
 import org.apache.hadoop.hdfs.util.DataTransferThrottler;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.Daemon;
 
+import com.google.common.annotations.VisibleForTesting;
 
 /**
  * Server used for receiving/sending a block of data.
@@ -45,6 +45,7 @@ class DataXceiverServer implements Runna
   private final PeerServer peerServer;
   private final DataNode datanode;
   private final HashMap<Peer, Thread> peers = new HashMap<Peer, Thread>();
+  private final HashMap<Peer, DataXceiver> peersXceiver = new HashMap<Peer, DataXceiver>();
   private boolean closed = false;
   
   /**
@@ -217,18 +218,38 @@ class DataXceiverServer implements Runna
     }
   }
   
-  synchronized void addPeer(Peer peer, Thread t) throws IOException {
+  synchronized void addPeer(Peer peer, Thread t, DataXceiver xceiver)
+      throws IOException {
     if (closed) {
       throw new IOException("Server closed.");
     }
     peers.put(peer, t);
+    peersXceiver.put(peer, xceiver);
   }
 
   synchronized void closePeer(Peer peer) {
     peers.remove(peer);
+    peersXceiver.remove(peer);
     IOUtils.cleanup(null, peer);
   }
 
+  // Sending OOB to all peers
+  public synchronized void sendOOBToPeers() {
+    if (!datanode.shutdownForUpgrade) {
+      return;
+    }
+
+    for (Peer p : peers.keySet()) {
+      try {
+        peersXceiver.get(p).sendOOB();
+      } catch (IOException e) {
+        LOG.warn("Got error when sending OOB message.", e);
+      } catch (InterruptedException e) {
+        LOG.warn("Interrupted when sending OOB message.");
+      }
+    }
+  }
+  
   // Notify all peers of the shutdown and restart.
   // datanode.shouldRun should still be true and datanode.restarting should
   // be set true before calling this method.
@@ -247,6 +268,7 @@ class DataXceiverServer implements Runna
       IOUtils.cleanup(LOG, p);
     }
     peers.clear();
+    peersXceiver.clear();
   }
 
   // Return the number of peers.
@@ -254,7 +276,14 @@ class DataXceiverServer implements Runna
     return peers.size();
   }
 
+  // Return the number of peers and DataXceivers.
+  @VisibleForTesting
+  synchronized int getNumPeersXceiver() {
+    return peersXceiver.size();
+  }
+  
   synchronized void releasePeer(Peer peer) {
     peers.remove(peer);
+    peersXceiver.remove(peer);
   }
 }

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeRollingUpgrade.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeRollingUpgrade.java?rev=1618747&r1=1618746&r2=1618747&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeRollingUpgrade.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeRollingUpgrade.java Mon Aug 18 21:26:45 2014
@@ -27,11 +27,14 @@ import static org.junit.Assert.assertTru
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Random;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSOutputStream;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
@@ -67,6 +70,7 @@ public class TestDataNodeRollingUpgrade 
 
   private void startCluster() throws IOException {
     conf = new HdfsConfiguration();
+    conf.setInt("dfs.blocksize", 1024*1024);
     cluster = new Builder(conf).numDataNodes(REPL_FACTOR).build();
     cluster.waitActive();
     fs = cluster.getFileSystem();
@@ -243,4 +247,48 @@ public class TestDataNodeRollingUpgrade 
       shutdownCluster();
     }
   }
+  
+  @Test (timeout=600000)
+  // Test DatanodeXceiver has correct peer-dataxceiver pairs for sending OOB message
+  public void testDatanodePeersXceiver() throws Exception {
+    try {
+      startCluster();
+
+      // Create files in DFS.
+      String testFile1 = "/TestDataNodeXceiver1.dat";
+      String testFile2 = "/TestDataNodeXceiver2.dat";
+      String testFile3 = "/TestDataNodeXceiver3.dat";
+
+      DFSClient client1 = new DFSClient(NameNode.getAddress(conf), conf);
+      DFSClient client2 = new DFSClient(NameNode.getAddress(conf), conf);
+      DFSClient client3 = new DFSClient(NameNode.getAddress(conf), conf);
+
+      DFSOutputStream s1 = (DFSOutputStream) client1.create(testFile1, true);
+      DFSOutputStream s2 = (DFSOutputStream) client2.create(testFile2, true);
+      DFSOutputStream s3 = (DFSOutputStream) client3.create(testFile3, true);
+
+      byte[] toWrite = new byte[1024*1024*8];
+      Random rb = new Random(1111);
+      rb.nextBytes(toWrite);
+      s1.write(toWrite, 0, 1024*1024*8);
+      s1.flush();
+      s2.write(toWrite, 0, 1024*1024*8);
+      s2.flush();
+      s3.write(toWrite, 0, 1024*1024*8);
+      s3.flush();       
+
+      assertTrue(dn.getXferServer().getNumPeersXceiver() == dn.getXferServer()
+          .getNumPeersXceiver());
+      s1.close();
+      s2.close();
+      s3.close();
+      assertTrue(dn.getXferServer().getNumPeersXceiver() == dn.getXferServer()
+          .getNumPeersXceiver());
+      client1.close();
+      client2.close();
+      client3.close();      
+    } finally {
+      shutdownCluster();
+    }
+  }
 }