You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by br...@apache.org on 2014/08/18 23:26:46 UTC
svn commit: r1618747 - in
/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs: ./
src/main/java/org/apache/hadoop/hdfs/server/datanode/
src/test/java/org/apache/hadoop/hdfs/server/datanode/
Author: brandonli
Date: Mon Aug 18 21:26:45 2014
New Revision: 1618747
URL: http://svn.apache.org/r1618747
Log:
HDFS-6569. Merging change r1618742 from trunk
Modified:
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeRollingUpgrade.java
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1618747&r1=1618746&r2=1618747&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Mon Aug 18 21:26:45 2014
@@ -259,6 +259,9 @@ Release 2.6.0 - UNRELEASED
HDFS-6825. Edit log corruption due to delayed block removal.
(Yongjun Zhang via wang)
+ HDFS-6569. OOB message can't be sent to the client when DataNode shuts down for upgrade
+ (brandonli)
+
Release 2.5.0 - UNRELEASED
INCOMPATIBLE CHANGES
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1618747&r1=1618746&r2=1618747&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Mon Aug 18 21:26:45 2014
@@ -712,7 +712,12 @@ class BlockReceiver implements Closeable
LOG.warn("Error managing cache for writer of block " + block, t);
}
}
-
+
+ public void sendOOB() throws IOException, InterruptedException {
+ ((PacketResponder) responder.getRunnable()).sendOOBResponse(PipelineAck
+ .getRestartOOBStatus());
+ }
+
void receiveBlock(
DataOutputStream mirrOut, // output to next datanode
DataInputStream mirrIn, // input from next datanode
@@ -800,9 +805,7 @@ class BlockReceiver implements Closeable
// The worst case is not recovering this RBW replica.
// Client will fall back to regular pipeline recovery.
}
- try {
- ((PacketResponder) responder.getRunnable()).
- sendOOBResponse(PipelineAck.getRestartOOBStatus());
+ try {
// Even if the connection is closed after the ack packet is
// flushed, the client can react to the connection closure
// first. Insert a delay to lower the chance of client
@@ -810,8 +813,6 @@ class BlockReceiver implements Closeable
Thread.sleep(1000);
} catch (InterruptedException ie) {
// It is already going down. Ignore this.
- } catch (IOException ioe) {
- LOG.info("Error sending OOB Ack.", ioe);
}
}
responder.interrupt();
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1618747&r1=1618746&r2=1618747&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Mon Aug 18 21:26:45 2014
@@ -273,6 +273,7 @@ public class DataNode extends Configured
public final static String EMPTY_DEL_HINT = "";
final AtomicInteger xmitsInProgress = new AtomicInteger();
Daemon dataXceiverServer = null;
+ DataXceiverServer xserver = null;
Daemon localDataXceiverServer = null;
ShortCircuitRegistry shortCircuitRegistry = null;
ThreadGroup threadGroup = null;
@@ -656,8 +657,8 @@ public class DataNode extends Configured
streamingAddr = tcpPeerServer.getStreamingAddr();
LOG.info("Opened streaming server at " + streamingAddr);
this.threadGroup = new ThreadGroup("dataXceiverServer");
- this.dataXceiverServer = new Daemon(threadGroup,
- new DataXceiverServer(tcpPeerServer, conf, this));
+ xserver = new DataXceiverServer(tcpPeerServer, conf, this);
+ this.dataXceiverServer = new Daemon(threadGroup, xserver);
this.threadGroup.setDaemon(true); // auto destroy when empty
if (conf.getBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY,
@@ -1145,6 +1146,11 @@ public class DataNode extends Configured
}
@VisibleForTesting
+ public DataXceiverServer getXferServer() {
+ return xserver;
+ }
+
+ @VisibleForTesting
public int getXferPort() {
return streamingAddr.getPort();
}
@@ -1402,6 +1408,7 @@ public class DataNode extends Configured
// in order to avoid any further acceptance of requests, but the peers
// for block writes are not closed until the clients are notified.
if (dataXceiverServer != null) {
+ xserver.sendOOBToPeers();
((DataXceiverServer) this.dataXceiverServer.getRunnable()).kill();
this.dataXceiverServer.interrupt();
}
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1618747&r1=1618746&r2=1618747&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Mon Aug 18 21:26:45 2014
@@ -103,7 +103,8 @@ class DataXceiver extends Receiver imple
private long opStartTime; //the start time of receiving an Op
private final InputStream socketIn;
private OutputStream socketOut;
-
+ private BlockReceiver blockReceiver = null;
+
/**
* Client Name used in previous operation. Not available on first request
* on the socket.
@@ -159,6 +160,12 @@ class DataXceiver extends Receiver imple
return socketOut;
}
+ public void sendOOB() throws IOException, InterruptedException {
+ LOG.info("Sending OOB to peer: " + peer);
+ if(blockReceiver!=null)
+ blockReceiver.sendOOB();
+ }
+
/**
* Read/write data from/to the DataXceiverServer.
*/
@@ -168,7 +175,7 @@ class DataXceiver extends Receiver imple
Op op = null;
try {
- dataXceiverServer.addPeer(peer, Thread.currentThread());
+ dataXceiverServer.addPeer(peer, Thread.currentThread(), this);
peer.setWriteTimeout(datanode.getDnConf().socketWriteTimeout);
InputStream input = socketIn;
IOStreamPair saslStreams = datanode.saslServer.receive(peer, socketOut,
@@ -584,7 +591,6 @@ class DataXceiver extends Receiver imple
DataOutputStream mirrorOut = null; // stream to next target
DataInputStream mirrorIn = null; // reply from next target
Socket mirrorSock = null; // socket to next target
- BlockReceiver blockReceiver = null; // responsible for data handling
String mirrorNode = null; // the name:port of next target
String firstBadLink = ""; // first datanode that failed in connection setup
Status mirrorInStatus = SUCCESS;
@@ -747,6 +753,7 @@ class DataXceiver extends Receiver imple
IOUtils.closeStream(replyOut);
IOUtils.closeSocket(mirrorSock);
IOUtils.closeStream(blockReceiver);
+ blockReceiver = null;
}
//update metrics
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java?rev=1618747&r1=1618746&r2=1618747&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java Mon Aug 18 21:26:45 2014
@@ -27,11 +27,11 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.net.PeerServer;
-import org.apache.hadoop.hdfs.server.balancer.Balancer;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Daemon;
+import com.google.common.annotations.VisibleForTesting;
/**
* Server used for receiving/sending a block of data.
@@ -45,6 +45,7 @@ class DataXceiverServer implements Runna
private final PeerServer peerServer;
private final DataNode datanode;
private final HashMap<Peer, Thread> peers = new HashMap<Peer, Thread>();
+ private final HashMap<Peer, DataXceiver> peersXceiver = new HashMap<Peer, DataXceiver>();
private boolean closed = false;
/**
@@ -217,18 +218,38 @@ class DataXceiverServer implements Runna
}
}
- synchronized void addPeer(Peer peer, Thread t) throws IOException {
+ synchronized void addPeer(Peer peer, Thread t, DataXceiver xceiver)
+ throws IOException {
if (closed) {
throw new IOException("Server closed.");
}
peers.put(peer, t);
+ peersXceiver.put(peer, xceiver);
}
synchronized void closePeer(Peer peer) {
peers.remove(peer);
+ peersXceiver.remove(peer);
IOUtils.cleanup(null, peer);
}
+ // Sending OOB to all peers
+ public synchronized void sendOOBToPeers() {
+ if (!datanode.shutdownForUpgrade) {
+ return;
+ }
+
+ for (Peer p : peers.keySet()) {
+ try {
+ peersXceiver.get(p).sendOOB();
+ } catch (IOException e) {
+ LOG.warn("Got error when sending OOB message.", e);
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted when sending OOB message.");
+ }
+ }
+ }
+
// Notify all peers of the shutdown and restart.
// datanode.shouldRun should still be true and datanode.restarting should
// be set true before calling this method.
@@ -247,6 +268,7 @@ class DataXceiverServer implements Runna
IOUtils.cleanup(LOG, p);
}
peers.clear();
+ peersXceiver.clear();
}
// Return the number of peers.
@@ -254,7 +276,14 @@ class DataXceiverServer implements Runna
return peers.size();
}
+ // Return the number of peers and DataXceivers.
+ @VisibleForTesting
+ synchronized int getNumPeersXceiver() {
+ return peersXceiver.size();
+ }
+
synchronized void releasePeer(Peer peer) {
peers.remove(peer);
+ peersXceiver.remove(peer);
}
}
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeRollingUpgrade.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeRollingUpgrade.java?rev=1618747&r1=1618746&r2=1618747&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeRollingUpgrade.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeRollingUpgrade.java Mon Aug 18 21:26:45 2014
@@ -27,11 +27,14 @@ import static org.junit.Assert.assertTru
import java.io.File;
import java.io.IOException;
+import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSOutputStream;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
@@ -67,6 +70,7 @@ public class TestDataNodeRollingUpgrade
private void startCluster() throws IOException {
conf = new HdfsConfiguration();
+ conf.setInt("dfs.blocksize", 1024*1024);
cluster = new Builder(conf).numDataNodes(REPL_FACTOR).build();
cluster.waitActive();
fs = cluster.getFileSystem();
@@ -243,4 +247,48 @@ public class TestDataNodeRollingUpgrade
shutdownCluster();
}
}
+
+ @Test (timeout=600000)
+ // Test DatanodeXceiver has correct peer-dataxceiver pairs for sending OOB message
+ public void testDatanodePeersXceiver() throws Exception {
+ try {
+ startCluster();
+
+ // Create files in DFS.
+ String testFile1 = "/TestDataNodeXceiver1.dat";
+ String testFile2 = "/TestDataNodeXceiver2.dat";
+ String testFile3 = "/TestDataNodeXceiver3.dat";
+
+ DFSClient client1 = new DFSClient(NameNode.getAddress(conf), conf);
+ DFSClient client2 = new DFSClient(NameNode.getAddress(conf), conf);
+ DFSClient client3 = new DFSClient(NameNode.getAddress(conf), conf);
+
+ DFSOutputStream s1 = (DFSOutputStream) client1.create(testFile1, true);
+ DFSOutputStream s2 = (DFSOutputStream) client2.create(testFile2, true);
+ DFSOutputStream s3 = (DFSOutputStream) client3.create(testFile3, true);
+
+ byte[] toWrite = new byte[1024*1024*8];
+ Random rb = new Random(1111);
+ rb.nextBytes(toWrite);
+ s1.write(toWrite, 0, 1024*1024*8);
+ s1.flush();
+ s2.write(toWrite, 0, 1024*1024*8);
+ s2.flush();
+ s3.write(toWrite, 0, 1024*1024*8);
+ s3.flush();
+
+ assertTrue(dn.getXferServer().getNumPeersXceiver() == dn.getXferServer()
+ .getNumPeersXceiver());
+ s1.close();
+ s2.close();
+ s3.close();
+ assertTrue(dn.getXferServer().getNumPeersXceiver() == dn.getXferServer()
+ .getNumPeersXceiver());
+ client1.close();
+ client2.close();
+ client3.close();
+ } finally {
+ shutdownCluster();
+ }
+ }
}