You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2011/06/21 22:09:57 UTC
svn commit: r1138160 [2/6] - in /hadoop/common/branches/HDFS-1073/hdfs: ./
bin/ src/c++/libhdfs/ src/c++/libhdfs/tests/ src/contrib/
src/contrib/fuse-dfs/ src/contrib/fuse-dfs/src/ src/contrib/hdfsproxy/
src/docs/src/documentation/content/xdocs/ src/ja...
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/BlockReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/BlockReader.java?rev=1138160&r1=1138159&r2=1138160&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/BlockReader.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/BlockReader.java Tue Jun 21 20:09:54 2011
@@ -17,9 +17,7 @@
*/
package org.apache.hadoop.hdfs;
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.CHECKSUM_OK;
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.ERROR_ACCESS_TOKEN;
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.SUCCESS;
+import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.vintPrefixed;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
@@ -34,9 +32,12 @@ import java.nio.ByteBuffer;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.FSInputChecker;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PacketHeader;
+import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.server.common.HdfsConstants;
@@ -45,6 +46,7 @@ import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
+
/** This is a wrapper around connection to datanode
* and understands checksum, offset etc.
*
@@ -138,9 +140,9 @@ public class BlockReader extends FSInput
// if eos was set in the previous read, send a status code to the DN
if (eos && !eosBefore && nRead >= 0) {
if (needChecksum()) {
- sendReadResult(dnSock, CHECKSUM_OK);
+ sendReadResult(dnSock, Status.CHECKSUM_OK);
} else {
- sendReadResult(dnSock, SUCCESS);
+ sendReadResult(dnSock, Status.SUCCESS);
}
}
return nRead;
@@ -313,20 +315,13 @@ public class BlockReader extends FSInput
pos + bytesToRead >= bytesNeededToFinish) {
// Read header
- int packetLen = in.readInt();
- long offsetInBlock = in.readLong();
- long seqno = in.readLong();
- boolean lastPacketInBlock = in.readBoolean();
- int dataLen = in.readInt();
+ PacketHeader hdr = new PacketHeader();
+ hdr.readFields(in);
- if (!lastPacketInBlock ||
- dataLen != 0) {
+ if (!hdr.isLastPacketInBlock() ||
+ hdr.getDataLen() != 0) {
throw new IOException("Expected empty end-of-read packet! Header: " +
- "(packetLen : " + packetLen +
- ", offsetInBlock : " + offsetInBlock +
- ", seqno : " + seqno +
- ", lastInBlock : " + lastPacketInBlock +
- ", dataLen : " + dataLen);
+ hdr);
}
eos = true;
@@ -409,7 +404,7 @@ public class BlockReader extends FSInput
String clientName)
throws IOException {
// in and out will be closed when sock is closed (by the caller)
- DataTransferProtocol.Sender.opReadBlock(
+ Sender.opReadBlock(
new DataOutputStream(new BufferedOutputStream(
NetUtils.getOutputStream(sock,HdfsConstants.WRITE_TIMEOUT))),
block, startOffset, len, clientName, blockToken);
@@ -422,9 +417,10 @@ public class BlockReader extends FSInput
new BufferedInputStream(NetUtils.getInputStream(sock),
bufferSize));
- DataTransferProtocol.Status status = DataTransferProtocol.Status.read(in);
- if (status != SUCCESS) {
- if (status == ERROR_ACCESS_TOKEN) {
+ BlockOpResponseProto status = BlockOpResponseProto.parseFrom(
+ vintPrefixed(in));
+ if (status.getStatus() != Status.SUCCESS) {
+ if (status.getStatus() == Status.ERROR_ACCESS_TOKEN) {
throw new InvalidBlockTokenException(
"Got access token error for OP_READ_BLOCK, self="
+ sock.getLocalSocketAddress() + ", remote="
@@ -499,11 +495,16 @@ public class BlockReader extends FSInput
* closing our connection (which we will re-open), but won't affect
* data correctness.
*/
- void sendReadResult(Socket sock, DataTransferProtocol.Status statusCode) {
+ void sendReadResult(Socket sock, Status statusCode) {
assert !sentStatusCode : "already sent status code to " + sock;
try {
OutputStream out = NetUtils.getOutputStream(sock, HdfsConstants.WRITE_TIMEOUT);
- statusCode.writeOutputStream(out);
+
+ ClientReadStatusProto.newBuilder()
+ .setStatus(statusCode)
+ .build()
+ .writeDelimitedTo(out);
+
out.flush();
sentStatusCode = true;
} catch (IOException e) {
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1138160&r1=1138159&r2=1138160&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/DFSClient.java Tue Jun 21 20:09:54 2011
@@ -18,10 +18,6 @@
*/
package org.apache.hadoop.hdfs;
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Op.BLOCK_CHECKSUM;
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.ERROR_ACCESS_TOKEN;
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.SUCCESS;
-
import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
@@ -65,17 +61,23 @@ import org.apache.hadoop.hdfs.protocol.C
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
+import org.apache.hadoop.hdfs.protocol.datatransfer.ReplaceDatanodeOnFailure;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.common.HdfsConstants;
import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
@@ -131,7 +133,7 @@ public class DFSClient implements FSCons
SocketFactory socketFactory;
int socketTimeout;
final int writePacketSize;
- final DataTransferProtocol.ReplaceDatanodeOnFailure dtpReplaceDatanodeOnFailure;
+ final ReplaceDatanodeOnFailure dtpReplaceDatanodeOnFailure;
final FileSystem.Statistics stats;
final int hdfsTimeout; // timeout value for a DFS operation.
final LeaseRenewer leaserenewer;
@@ -265,7 +267,7 @@ public class DFSClient implements FSCons
this.writePacketSize =
conf.getInt(DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY,
DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT);
- this.dtpReplaceDatanodeOnFailure = DataTransferProtocol.ReplaceDatanodeOnFailure.get(conf);
+ this.dtpReplaceDatanodeOnFailure = ReplaceDatanodeOnFailure.get(conf);
// The hdfsTimeout is currently the same as the ipc timeout
this.hdfsTimeout = Client.getTimeout(conf);
@@ -1112,15 +1114,16 @@ public class DFSClient implements FSCons
if (LOG.isDebugEnabled()) {
LOG.debug("write to " + datanodes[j].getName() + ": "
- + BLOCK_CHECKSUM + ", block=" + block);
+ + Op.BLOCK_CHECKSUM + ", block=" + block);
}
// get block MD5
- DataTransferProtocol.Sender.opBlockChecksum(out, block,
- lb.getBlockToken());
+ Sender.opBlockChecksum(out, block, lb.getBlockToken());
+
+ final BlockOpResponseProto reply =
+ BlockOpResponseProto.parseFrom(HdfsProtoUtil.vintPrefixed(in));
- final DataTransferProtocol.Status reply = DataTransferProtocol.Status.read(in);
- if (reply != SUCCESS) {
- if (reply == ERROR_ACCESS_TOKEN
+ if (reply.getStatus() != Status.SUCCESS) {
+ if (reply.getStatus() == Status.ERROR_ACCESS_TOKEN
&& i > lastRetriedIndex) {
if (LOG.isDebugEnabled()) {
LOG.debug("Got access token error in response to OP_BLOCK_CHECKSUM "
@@ -1138,9 +1141,12 @@ public class DFSClient implements FSCons
+ block + " from datanode " + datanodes[j].getName());
}
}
+
+ OpBlockChecksumResponseProto checksumData =
+ reply.getChecksumResponse();
//read byte-per-checksum
- final int bpc = in.readInt();
+ final int bpc = checksumData.getBytesPerCrc();
if (i == 0) { //first block
bytesPerCRC = bpc;
}
@@ -1150,13 +1156,14 @@ public class DFSClient implements FSCons
}
//read crc-per-block
- final long cpb = in.readLong();
+ final long cpb = checksumData.getCrcPerBlock();
if (locatedblocks.size() > 1 && i == 0) {
crcPerBlock = cpb;
}
//read md5
- final MD5Hash md5 = MD5Hash.read(in);
+ final MD5Hash md5 = new MD5Hash(
+ checksumData.getMd5().toByteArray());
md5.write(md5out);
done = true;
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java?rev=1138160&r1=1138159&r2=1138160&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java Tue Jun 21 20:09:54 2011
@@ -17,8 +17,7 @@
*/
package org.apache.hadoop.hdfs;
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.ERROR_ACCESS_TOKEN;
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.SUCCESS;
+import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.SUCCESS;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
@@ -47,16 +46,20 @@ import org.apache.hadoop.fs.UnresolvedLi
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PacketHeader;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PipelineAck;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
+import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
+import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
+import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
@@ -64,7 +67,6 @@ import org.apache.hadoop.hdfs.server.nam
import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.AccessControlException;
@@ -75,6 +77,8 @@ import org.apache.hadoop.util.Progressab
import org.apache.hadoop.util.PureJavaCrc32;
import org.apache.hadoop.util.StringUtils;
+
+
/****************************************************************
* DFSOutputStream creates files from a stream of bytes.
*
@@ -650,7 +654,7 @@ class DFSOutputStream extends FSOutputSu
long seqno = ack.getSeqno();
// processes response status from datanodes.
for (int i = ack.getNumOfReplies()-1; i >=0 && dfsClient.clientRunning; i--) {
- final DataTransferProtocol.Status reply = ack.getReply(i);
+ final Status reply = ack.getReply(i);
if (reply != SUCCESS) {
errorIndex = i; // first bad datanode
throw new IOException("Bad response " + reply +
@@ -843,12 +847,14 @@ class DFSOutputStream extends FSOutputSu
DataNode.SMALL_BUFFER_SIZE));
//send the TRANSFER_BLOCK request
- DataTransferProtocol.Sender.opTransferBlock(out, block,
+ Sender.opTransferBlock(out, block,
dfsClient.clientName, targets, blockToken);
//ack
in = new DataInputStream(NetUtils.getInputStream(sock));
- if (SUCCESS != DataTransferProtocol.Status.read(in)) {
+ BlockOpResponseProto response =
+ BlockOpResponseProto.parseFrom(HdfsProtoUtil.vintPrefixed(in));
+ if (SUCCESS != response.getStatus()) {
throw new IOException("Failed to add a datanode");
}
} finally {
@@ -990,7 +996,7 @@ class DFSOutputStream extends FSOutputSu
//
private boolean createBlockOutputStream(DatanodeInfo[] nodes, long newGS,
boolean recoveryFlag) {
- DataTransferProtocol.Status pipelineStatus = SUCCESS;
+ Status pipelineStatus = SUCCESS;
String firstBadLink = "";
if (DFSClient.LOG.isDebugEnabled()) {
for (int i = 0; i < nodes.length; i++) {
@@ -1015,7 +1021,7 @@ class DFSOutputStream extends FSOutputSu
blockReplyStream = new DataInputStream(NetUtils.getInputStream(s));
// send the request
- DataTransferProtocol.Sender.opWriteBlock(out, block,
+ Sender.opWriteBlock(out, block,
nodes.length, recoveryFlag ? stage.getRecoveryStage() : stage, newGS,
block.getNumBytes(), bytesSent, dfsClient.clientName, null, nodes,
accessToken);
@@ -1023,10 +1029,13 @@ class DFSOutputStream extends FSOutputSu
out.flush();
// receive ack for connect
- pipelineStatus = DataTransferProtocol.Status.read(blockReplyStream);
- firstBadLink = Text.readString(blockReplyStream);
+ BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
+ HdfsProtoUtil.vintPrefixed(blockReplyStream));
+ pipelineStatus = resp.getStatus();
+ firstBadLink = resp.getFirstBadLink();
+
if (pipelineStatus != SUCCESS) {
- if (pipelineStatus == ERROR_ACCESS_TOKEN) {
+ if (pipelineStatus == Status.ERROR_ACCESS_TOKEN) {
throw new InvalidBlockTokenException(
"Got access token error for connect ack with firstBadLink as "
+ firstBadLink);
@@ -1401,9 +1410,8 @@ class DFSOutputStream extends FSOutputSu
}
/**
- * flushes out to all replicas of the block.
- * The data is in the buffers of the DNs
- * but not neccessary on the DN's OS buffers.
+ * Flushes out to all replicas of the block. The data is in the buffers
+ * of the DNs but not necessarily in the DN's OS buffers.
*
* It is a synchronous operation. When it returns,
* it guarantees that flushed data become visible to new readers.
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/HftpFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/HftpFileSystem.java?rev=1138160&r1=1138159&r2=1138160&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/HftpFileSystem.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/HftpFileSystem.java Tue Jun 21 20:09:54 2011
@@ -190,9 +190,6 @@ public class HftpFileSystem extends File
}
}
- //Renew TGT if needed
- ugi.reloginFromKeytab();
-
//since we don't already have a token, go get one over https
if (delegationToken == null) {
delegationToken =
@@ -204,8 +201,10 @@ public class HftpFileSystem extends File
@Override
- public Token<?> getDelegationToken(final String renewer) throws IOException {
+ public synchronized Token<?> getDelegationToken(final String renewer) throws IOException {
try {
+ //Renew TGT if needed
+ ugi.reloginFromKeytab();
return ugi.doAs(new PrivilegedExceptionAction<Token<?>>() {
public Token<?> run() throws IOException {
Credentials c;
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java?rev=1138160&r1=1138159&r2=1138160&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java Tue Jun 21 20:09:54 2011
@@ -159,6 +159,11 @@ public class DatanodeInfo extends Datano
public void setCapacity(long capacity) {
this.capacity = capacity;
}
+
+ /** Sets the used space for the datanode. */
+ public void setDfsUsed(long dfsUsed) {
+ this.dfsUsed = dfsUsed;
+ }
/** Sets raw free space. */
public void setRemaining(long remaining) {
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/protocol/LayoutVersion.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/protocol/LayoutVersion.java?rev=1138160&r1=1138159&r2=1138160&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/protocol/LayoutVersion.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/protocol/LayoutVersion.java Tue Jun 21 20:09:54 2011
@@ -108,6 +108,30 @@ public class LayoutVersion {
this.ancestorLV = ancestorLV;
this.description = description;
}
+
+ /**
+ * Accessor method for feature layout version
+ * @return int lv value
+ */
+ public int getLayoutVersion() {
+ return lv;
+ }
+
+ /**
+ * Accessor method for feature ancestor layout version
+ * @return int ancestor LV value
+ */
+ public int getAncestorLayoutVersion() {
+ return ancestorLV;
+ }
+
+ /**
+ * Accessor method for feature description
+ * @return String feature description
+ */
+ public String getDescription() {
+ return description;
+ }
}
// Build layout version and corresponding feature matrix
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=1138160&r1=1138159&r2=1138160&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java Tue Jun 21 20:09:54 2011
@@ -17,7 +17,7 @@
*/
package org.apache.hadoop.hdfs.server.balancer;
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.ERROR_ACCESS_TOKEN;
+import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.vintPrefixed;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
@@ -53,11 +53,13 @@ import org.apache.hadoop.conf.Configured
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.common.HdfsConstants;
import org.apache.hadoop.hdfs.server.common.Util;
@@ -346,15 +348,16 @@ public class Balancer {
private void sendRequest(DataOutputStream out) throws IOException {
final ExtendedBlock eb = new ExtendedBlock(nnc.blockpoolID, block.getBlock());
final Token<BlockTokenIdentifier> accessToken = nnc.getAccessToken(eb);
- DataTransferProtocol.Sender.opReplaceBlock(out, eb, source.getStorageID(),
+ Sender.opReplaceBlock(out, eb, source.getStorageID(),
proxySource.getDatanode(), accessToken);
}
/* Receive a block copy response from the input stream */
private void receiveResponse(DataInputStream in) throws IOException {
- DataTransferProtocol.Status status = DataTransferProtocol.Status.read(in);
- if (status != DataTransferProtocol.Status.SUCCESS) {
- if (status == ERROR_ACCESS_TOKEN)
+ BlockOpResponseProto response = BlockOpResponseProto.parseFrom(
+ vintPrefixed(in));
+ if (response.getStatus() != Status.SUCCESS) {
+ if (response.getStatus() == Status.ERROR_ACCESS_TOKEN)
throw new IOException("block move failed due to access token error");
throw new IOException("block move is failed");
}
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1138160&r1=1138159&r2=1138160&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Tue Jun 21 20:09:54 2011
@@ -17,8 +17,6 @@
*/
package org.apache.hadoop.hdfs.server.datanode;
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.ERROR;
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.SUCCESS;
import static org.apache.hadoop.hdfs.server.datanode.DataNode.DN_CLIENTTRACE_FORMAT;
import java.io.BufferedOutputStream;
@@ -36,14 +34,14 @@ import java.util.zip.Checksum;
import org.apache.commons.logging.Log;
import org.apache.hadoop.fs.FSOutputSummer;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PacketHeader;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PipelineAck;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
+import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
+import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
@@ -996,13 +994,13 @@ class BlockReceiver implements Closeable
Status[] replies = null;
if (mirrorError) { // ack read error
replies = new Status[2];
- replies[0] = SUCCESS;
- replies[1] = ERROR;
+ replies[0] = Status.SUCCESS;
+ replies[1] = Status.ERROR;
} else {
short ackLen = type == PacketResponderType.LAST_IN_PIPELINE? 0
: ack.getNumOfReplies();
replies = new Status[1+ackLen];
- replies[0] = SUCCESS;
+ replies[0] = Status.SUCCESS;
for (int i=0; i<ackLen; i++) {
replies[i+1] = ack.getReply(i);
}
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java?rev=1138160&r1=1138159&r2=1138160&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java Tue Jun 21 20:09:54 2011
@@ -33,7 +33,7 @@ import org.apache.commons.logging.Log;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.FSConstants;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PacketHeader;
+import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.SocketOutputStream;
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1138160&r1=1138159&r2=1138160&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Tue Jun 21 20:09:54 2011
@@ -18,9 +18,32 @@
package org.apache.hadoop.hdfs.server.datanode;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.ERROR_ACCESS_TOKEN;
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.SUCCESS;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ADMIN;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_HTTPS_NEED_AUTH_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_HTTPS_NEED_AUTH_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_PERMISSION_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_PERMISSION_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HANDLER_COUNT_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HANDLER_COUNT_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STORAGEID_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SYNCONCLOSE_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SYNCONCLOSE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_FEDERATION_NAMESERVICES;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.server.common.Util.now;
import java.io.BufferedOutputStream;
@@ -68,15 +91,19 @@ import org.apache.hadoop.hdfs.HdfsConfig
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
+import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DNTransferAckProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
@@ -133,6 +160,7 @@ import org.apache.hadoop.util.StringUtil
import org.apache.hadoop.util.VersionInfo;
import org.mortbay.util.ajax.JSON;
+
/**********************************************************
* DataNode is a class (and program) that stores a set of
* blocks for a DFS deployment. A single deployment can
@@ -1397,6 +1425,10 @@ public class DataNode extends Configured
return blockPoolManager.getAllNamenodeThreads();
}
+ int getBpOsCount() {
+ return blockPoolManager.getAllNamenodeThreads().length;
+ }
+
/**
* Initializes the {@link #data}. The initialization is done only once, when
* handshake with the the first namenode is completed.
@@ -1945,7 +1977,7 @@ public class DataNode extends Configured
EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE));
}
- DataTransferProtocol.Sender.opWriteBlock(out,
+ Sender.opWriteBlock(out,
b, 0, stage, 0, 0, 0, clientname, srcNode, targets, accessToken);
// send data & checksum
@@ -1958,12 +1990,13 @@ public class DataNode extends Configured
// read ack
if (isClient) {
in = new DataInputStream(NetUtils.getInputStream(sock));
- final DataTransferProtocol.Status s = DataTransferProtocol.Status.read(in);
+ DNTransferAckProto closeAck = DNTransferAckProto.parseFrom(
+ HdfsProtoUtil.vintPrefixed(in));
if (LOG.isDebugEnabled()) {
- LOG.debug(getClass().getSimpleName() + ": close-ack=" + s);
+ LOG.debug(getClass().getSimpleName() + ": close-ack=" + closeAck);
}
- if (s != SUCCESS) {
- if (s == ERROR_ACCESS_TOKEN) {
+ if (closeAck.getStatus() != Status.SUCCESS) {
+ if (closeAck.getStatus() == Status.ERROR_ACCESS_TOKEN) {
throw new InvalidBlockTokenException(
"Got access token error for connect ack, targets="
+ Arrays.asList(targets));
@@ -2105,6 +2138,10 @@ public class DataNode extends Configured
while (shouldRun) {
try {
blockPoolManager.joinAll();
+ if (blockPoolManager.getAllNamenodeThreads() != null
+ && blockPoolManager.getAllNamenodeThreads().length == 0) {
+ shouldRun = false;
+ }
Thread.sleep(2000);
} catch (InterruptedException ex) {
LOG.warn("Received exception in Datanode#join: " + ex);
@@ -2243,6 +2280,13 @@ public class DataNode extends Configured
} catch (Throwable e) {
LOG.error(StringUtils.stringifyException(e));
System.exit(-1);
+ } finally {
+ // We need to add System.exit here because either shutdown was called or
+ // some disk related conditions like volumes tolerated or volumes required
+ // condition was not met. Also, In secure mode, control will go to Jsvc
+ // and Datanode process hangs without System.exit.
+ LOG.warn("Exiting Datanode");
+ System.exit(0);
}
}
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1138160&r1=1138159&r2=1138160&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Tue Jun 21 20:09:54 2011
@@ -17,9 +17,9 @@
*/
package org.apache.hadoop.hdfs.server.datanode;
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.ERROR;
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.ERROR_ACCESS_TOKEN;
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.SUCCESS;
+import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR;
+import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR_ACCESS_TOKEN;
+import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.SUCCESS;
import static org.apache.hadoop.hdfs.server.common.Util.now;
import static org.apache.hadoop.hdfs.server.datanode.DataNode.DN_CLIENTTRACE_FORMAT;
@@ -39,11 +39,18 @@ import java.util.Arrays;
import org.apache.commons.logging.Log;
import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
+import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Receiver;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.server.common.HdfsConstants;
@@ -51,20 +58,19 @@ import org.apache.hadoop.hdfs.server.dat
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.MD5Hash;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.metrics2.lib.MutableCounterLong;
-import org.apache.hadoop.metrics2.lib.MutableRate;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.StringUtils;
+import com.google.protobuf.ByteString;
+
+
/**
* Thread for processing incoming/outgoing data stream.
*/
-class DataXceiver extends DataTransferProtocol.Receiver
- implements Runnable, FSConstants {
+class DataXceiver extends Receiver implements Runnable, FSConstants {
public static final Log LOG = DataNode.LOG;
static final Log ClientTraceLog = DataNode.ClientTraceLog;
@@ -123,6 +129,7 @@ class DataXceiver extends DataTransferPr
DataInputStream in=null;
int opsProcessed = 0;
+ Op op = null;
try {
in = new DataInputStream(
new BufferedInputStream(NetUtils.getInputStream(s),
@@ -133,7 +140,6 @@ class DataXceiver extends DataTransferPr
// This optimistic behaviour allows the other end to reuse connections.
// Setting keepalive timeout to 0 disable this behavior.
do {
- DataTransferProtocol.Op op;
try {
if (opsProcessed != 0) {
assert socketKeepaliveTimeout > 0;
@@ -172,10 +178,12 @@ class DataXceiver extends DataTransferPr
opStartTime = now();
processOp(op, in);
++opsProcessed;
- } while (s.isConnected() && socketKeepaliveTimeout > 0);
+ } while (!s.isClosed() && socketKeepaliveTimeout > 0);
} catch (Throwable t) {
- LOG.error(datanode.getMachineName() + ":DataXceiver, at " +
- s.toString(), t);
+ LOG.error(datanode.getMachineName() + ":DataXceiver error processing " +
+ ((op == null) ? "unknown" : op.name()) + " operation " +
+ " src: " + remoteAddress +
+ " dest: " + localAddress, t);
} finally {
if (LOG.isDebugEnabled()) {
LOG.debug(datanode.getMachineName() + ":Number of active connections is: "
@@ -200,8 +208,7 @@ class DataXceiver extends DataTransferPr
DataOutputStream out = new DataOutputStream(
new BufferedOutputStream(baseStream, SMALL_BUFFER_SIZE));
checkAccess(out, true, block, blockToken,
- DataTransferProtocol.Op.READ_BLOCK,
- BlockTokenSecretManager.AccessMode.READ);
+ Op.READ_BLOCK, BlockTokenSecretManager.AccessMode.READ);
// send the block
BlockSender blockSender = null;
@@ -213,7 +220,7 @@ class DataXceiver extends DataTransferPr
"%d", "HDFS_READ", clientName, "%d",
dnR.getStorageID(), block, "%d")
: dnR + " Served block " + block + " to " +
- s.getInetAddress();
+ remoteAddress;
updateCurrentThreadName("Sending block " + block);
try {
@@ -221,19 +228,23 @@ class DataXceiver extends DataTransferPr
blockSender = new BlockSender(block, startOffset, length,
true, true, false, datanode, clientTraceFmt);
} catch(IOException e) {
+ LOG.info("opReadBlock " + block + " received exception " + e);
sendResponse(s, ERROR, datanode.socketWriteTimeout);
throw e;
}
+
+ // send op status
+ sendResponse(s, SUCCESS, datanode.socketWriteTimeout);
- SUCCESS.write(out); // send op status
long read = blockSender.sendBlock(out, baseStream, null); // send data
if (blockSender.didSendEntireByteRange()) {
// If we sent the entire range, then we should expect the client
// to respond with a Status enum.
try {
- DataTransferProtocol.Status stat = DataTransferProtocol.Status.read(in);
- if (stat == null) {
+ ClientReadStatusProto stat = ClientReadStatusProto.parseFrom(
+ HdfsProtoUtil.vintPrefixed(in));
+ if (!stat.hasStatus()) {
LOG.warn("Client " + s.getInetAddress() + " did not send a valid status " +
"code after reading. Will close connection.");
IOUtils.closeStream(out);
@@ -248,6 +259,10 @@ class DataXceiver extends DataTransferPr
datanode.metrics.incrBytesRead((int) read);
datanode.metrics.incrBlocksRead();
} catch ( SocketException ignored ) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(dnR + ":Ignoring exception while serving " + block + " to " +
+ remoteAddress, ignored);
+ }
// Its ok for remote side to close the connection anytime.
datanode.metrics.incrBlocksRead();
IOUtils.closeStream(out);
@@ -257,7 +272,7 @@ class DataXceiver extends DataTransferPr
*/
LOG.warn(dnR + ":Got exception while serving " +
block + " to " +
- s.getInetAddress() + ":\n" +
+ remoteAddress + ":\n" +
StringUtils.stringifyException(ioe) );
throw ioe;
} finally {
@@ -320,8 +335,7 @@ class DataXceiver extends DataTransferPr
NetUtils.getOutputStream(s, datanode.socketWriteTimeout),
SMALL_BUFFER_SIZE));
checkAccess(replyOut, isClient, block, blockToken,
- DataTransferProtocol.Op.WRITE_BLOCK,
- BlockTokenSecretManager.AccessMode.WRITE);
+ Op.WRITE_BLOCK, BlockTokenSecretManager.AccessMode.WRITE);
DataOutputStream mirrorOut = null; // stream to next target
DataInputStream mirrorIn = null; // reply from next target
@@ -329,7 +343,7 @@ class DataXceiver extends DataTransferPr
BlockReceiver blockReceiver = null; // responsible for data handling
String mirrorNode = null; // the name:port of next target
String firstBadLink = ""; // first datanode that failed in connection setup
- DataTransferProtocol.Status mirrorInStatus = SUCCESS;
+ Status mirrorInStatus = SUCCESS;
try {
if (isDatanode ||
stage != BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
@@ -366,7 +380,7 @@ class DataXceiver extends DataTransferPr
SMALL_BUFFER_SIZE));
mirrorIn = new DataInputStream(NetUtils.getInputStream(mirrorSock));
- DataTransferProtocol.Sender.opWriteBlock(mirrorOut, originalBlock,
+ Sender.opWriteBlock(mirrorOut, originalBlock,
pipelineSize, stage, newGs, minBytesRcvd, maxBytesRcvd, clientname,
srcDataNode, targets, blockToken);
@@ -377,8 +391,10 @@ class DataXceiver extends DataTransferPr
// read connect ack (only for clients, not for replication req)
if (isClient) {
- mirrorInStatus = DataTransferProtocol.Status.read(mirrorIn);
- firstBadLink = Text.readString(mirrorIn);
+ BlockOpResponseProto connectAck =
+ BlockOpResponseProto.parseFrom(HdfsProtoUtil.vintPrefixed(mirrorIn));
+ mirrorInStatus = connectAck.getStatus();
+ firstBadLink = connectAck.getFirstBadLink();
if (LOG.isDebugEnabled() || mirrorInStatus != SUCCESS) {
LOG.info("Datanode " + targets.length +
" got response for connect ack " +
@@ -389,8 +405,11 @@ class DataXceiver extends DataTransferPr
} catch (IOException e) {
if (isClient) {
- ERROR.write(replyOut);
- Text.writeString(replyOut, mirrorNode);
+ BlockOpResponseProto.newBuilder()
+ .setStatus(ERROR)
+ .setFirstBadLink(mirrorNode)
+ .build()
+ .writeDelimitedTo(replyOut);
replyOut.flush();
}
IOUtils.closeStream(mirrorOut);
@@ -400,6 +419,8 @@ class DataXceiver extends DataTransferPr
IOUtils.closeSocket(mirrorSock);
mirrorSock = null;
if (isClient) {
+ LOG.error(datanode + ":Exception transfering block " +
+ block + " to mirror " + mirrorNode + ": " + e);
throw e;
} else {
LOG.info(datanode + ":Exception transfering block " +
@@ -417,8 +438,11 @@ class DataXceiver extends DataTransferPr
" forwarding connect ack to upstream firstbadlink is " +
firstBadLink);
}
- mirrorInStatus.write(replyOut);
- Text.writeString(replyOut, firstBadLink);
+ BlockOpResponseProto.newBuilder()
+ .setStatus(mirrorInStatus)
+ .setFirstBadLink(firstBadLink)
+ .build()
+ .writeDelimitedTo(replyOut);
replyOut.flush();
}
@@ -433,7 +457,7 @@ class DataXceiver extends DataTransferPr
if (LOG.isTraceEnabled()) {
LOG.trace("TRANSFER: send close-ack");
}
- SUCCESS.write(replyOut);
+ writeResponse(SUCCESS, replyOut);
}
}
@@ -458,7 +482,7 @@ class DataXceiver extends DataTransferPr
} catch (IOException ioe) {
- LOG.info("writeBlock " + block + " received exception " + ioe);
+ LOG.info("opWriteBlock " + block + " received exception " + ioe);
throw ioe;
} finally {
// close all opened streams
@@ -480,21 +504,20 @@ class DataXceiver extends DataTransferPr
final DatanodeInfo[] targets,
final Token<BlockTokenIdentifier> blockToken) throws IOException {
checkAccess(null, true, blk, blockToken,
- DataTransferProtocol.Op.TRANSFER_BLOCK,
- BlockTokenSecretManager.AccessMode.COPY);
+ Op.TRANSFER_BLOCK, BlockTokenSecretManager.AccessMode.COPY);
- updateCurrentThreadName(DataTransferProtocol.Op.TRANSFER_BLOCK + " " + blk);
+ updateCurrentThreadName(Op.TRANSFER_BLOCK + " " + blk);
final DataOutputStream out = new DataOutputStream(
NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
try {
datanode.transferReplicaForPipelineRecovery(blk, targets, client);
- SUCCESS.write(out);
+ writeResponse(Status.SUCCESS, out);
} finally {
IOUtils.closeStream(out);
}
}
-
+
/**
* Get block checksum (MD5 of CRC32).
*/
@@ -504,8 +527,7 @@ class DataXceiver extends DataTransferPr
final DataOutputStream out = new DataOutputStream(
NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
checkAccess(out, true, block, blockToken,
- DataTransferProtocol.Op.BLOCK_CHECKSUM,
- BlockTokenSecretManager.AccessMode.READ);
+ Op.BLOCK_CHECKSUM, BlockTokenSecretManager.AccessMode.READ);
updateCurrentThreadName("Reading metadata for block " + block);
final MetaDataInputStream metadataIn =
datanode.data.getMetaDataInputStream(block);
@@ -530,10 +552,15 @@ class DataXceiver extends DataTransferPr
}
//write reply
- SUCCESS.write(out);
- out.writeInt(bytesPerCRC);
- out.writeLong(crcPerBlock);
- md5.write(out);
+ BlockOpResponseProto.newBuilder()
+ .setStatus(SUCCESS)
+ .setChecksumResponse(OpBlockChecksumResponseProto.newBuilder()
+ .setBytesPerCrc(bytesPerCRC)
+ .setCrcPerBlock(crcPerBlock)
+ .setMd5(ByteString.copyFrom(md5.getDigest()))
+ )
+ .build()
+ .writeDelimitedTo(out);
out.flush();
} finally {
IOUtils.closeStream(out);
@@ -590,7 +617,7 @@ class DataXceiver extends DataTransferPr
baseStream, SMALL_BUFFER_SIZE));
// send status first
- SUCCESS.write(reply);
+ writeResponse(SUCCESS, reply);
// send block content to the target
long read = blockSender.sendBlock(reply, baseStream,
dataXceiverServer.balanceThrottler);
@@ -601,6 +628,7 @@ class DataXceiver extends DataTransferPr
LOG.info("Copied block " + block + " to " + s.getRemoteSocketAddress());
} catch (IOException ioe) {
isOpSuccess = false;
+ LOG.info("opCopyBlock " + block + " received exception " + ioe);
throw ioe;
} finally {
dataXceiverServer.balanceThrottler.release();
@@ -653,7 +681,7 @@ class DataXceiver extends DataTransferPr
Socket proxySock = null;
DataOutputStream proxyOut = null;
- DataTransferProtocol.Status opStatus = SUCCESS;
+ Status opStatus = SUCCESS;
BlockReceiver blockReceiver = null;
DataInputStream proxyReply = null;
@@ -671,15 +699,16 @@ class DataXceiver extends DataTransferPr
new BufferedOutputStream(baseStream, SMALL_BUFFER_SIZE));
/* send request to the proxy */
- DataTransferProtocol.Sender.opCopyBlock(proxyOut, block, blockToken);
+ Sender.opCopyBlock(proxyOut, block, blockToken);
// receive the response from the proxy
proxyReply = new DataInputStream(new BufferedInputStream(
NetUtils.getInputStream(proxySock), BUFFER_SIZE));
- final DataTransferProtocol.Status status
- = DataTransferProtocol.Status.read(proxyReply);
- if (status != SUCCESS) {
- if (status == ERROR_ACCESS_TOKEN) {
+ BlockOpResponseProto copyResponse = BlockOpResponseProto.parseFrom(
+ HdfsProtoUtil.vintPrefixed(proxyReply));
+
+ if (copyResponse.getStatus() != SUCCESS) {
+ if (copyResponse.getStatus() == ERROR_ACCESS_TOKEN) {
throw new IOException("Copy block " + block + " from "
+ proxySock.getRemoteSocketAddress()
+ " failed due to access token error");
@@ -705,6 +734,7 @@ class DataXceiver extends DataTransferPr
} catch (IOException ioe) {
opStatus = ERROR;
+ LOG.info("opReplaceBlock " + block + " received exception " + ioe);
throw ioe;
} finally {
// receive the last byte that indicates the proxy released its thread resource
@@ -737,29 +767,35 @@ class DataXceiver extends DataTransferPr
return now() - opStartTime;
}
- private void updateCounter(MutableCounterLong localCounter,
- MutableCounterLong remoteCounter) {
- (isLocal? localCounter: remoteCounter).incr();
- }
-
/**
* Utility function for sending a response.
* @param s socket to write to
* @param opStatus status message to write
* @param timeout send timeout
**/
- private void sendResponse(Socket s, DataTransferProtocol.Status opStatus,
+ private void sendResponse(Socket s, Status status,
long timeout) throws IOException {
DataOutputStream reply =
new DataOutputStream(NetUtils.getOutputStream(s, timeout));
- opStatus.write(reply);
- reply.flush();
+
+ writeResponse(status, reply);
}
+
+ private void writeResponse(Status status, OutputStream out)
+ throws IOException {
+ BlockOpResponseProto response = BlockOpResponseProto.newBuilder()
+ .setStatus(status)
+ .build();
+
+ response.writeDelimitedTo(out);
+ out.flush();
+ }
+
private void checkAccess(DataOutputStream out, final boolean reply,
final ExtendedBlock blk,
final Token<BlockTokenIdentifier> t,
- final DataTransferProtocol.Op op,
+ final Op op,
final BlockTokenSecretManager.AccessMode mode) throws IOException {
if (datanode.isBlockTokenEnabled) {
try {
@@ -771,12 +807,15 @@ class DataXceiver extends DataTransferPr
out = new DataOutputStream(
NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
}
- ERROR_ACCESS_TOKEN.write(out);
+
+ BlockOpResponseProto.Builder resp = BlockOpResponseProto.newBuilder()
+ .setStatus(ERROR_ACCESS_TOKEN);
if (mode == BlockTokenSecretManager.AccessMode.WRITE) {
DatanodeRegistration dnR =
datanode.getDNRegistrationForBP(blk.getBlockPoolId());
- Text.writeString(out, dnR.getName());
+ resp.setFirstBadLink(dnR.getName());
}
+ resp.build().writeDelimitedTo(out);
out.flush();
}
LOG.warn("Block token verification failed: op=" + op
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java?rev=1138160&r1=1138159&r2=1138160&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java Tue Jun 21 20:09:54 2011
@@ -47,7 +47,7 @@ class DataXceiverServer implements Runna
ServerSocket ss;
DataNode datanode;
- // Record all sockets opend for data transfer
+ // Record all sockets opened for data transfer
Map<Socket, Socket> childSockets = Collections.synchronizedMap(
new HashMap<Socket, Socket>());
@@ -140,19 +140,18 @@ class DataXceiverServer implements Runna
} catch (SocketTimeoutException ignored) {
// wake up to see if should continue to run
} catch (IOException ie) {
- LOG.warn(datanode.getMachineName() + ":DataXceiveServer: "
- + StringUtils.stringifyException(ie));
+ LOG.warn(datanode.getMachineName() + ":DataXceiveServer: ", ie);
} catch (Throwable te) {
- LOG.error(datanode.getMachineName() + ":DataXceiveServer: Exiting due to:"
- + StringUtils.stringifyException(te));
+ LOG.error(datanode.getMachineName()
+ + ":DataXceiveServer: Exiting due to: ", te);
datanode.shouldRun = false;
}
}
try {
ss.close();
} catch (IOException ie) {
- LOG.warn(datanode.getMachineName() + ":DataXceiveServer: "
- + StringUtils.stringifyException(ie));
+ LOG.warn(datanode.getMachineName()
+ + ":DataXceiveServer: Close exception due to: ", ie);
}
}
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java?rev=1138160&r1=1138159&r2=1138160&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java Tue Jun 21 20:09:54 2011
@@ -241,6 +241,11 @@ public class DatanodeJspHelper {
}
datanodePort = Integer.parseInt(datanodePortStr);
+ final Long genStamp = JspHelper.validateLong(req.getParameter("genstamp"));
+ if (genStamp == null) {
+ out.print("Invalid input (genstamp absent)");
+ return;
+ }
String namenodeInfoPortStr = req.getParameter("namenodeInfoPort");
int namenodeInfoPort = -1;
if (namenodeInfoPortStr != null)
@@ -322,6 +327,8 @@ public class DatanodeJspHelper {
+ startOffset + "\">");
out.print("<input type=\"hidden\" name=\"filename\" value=\"" + filename
+ "\">");
+ out.print("<input type=\"hidden\" name=\"genstamp\" value=\"" + genStamp
+ + "\">");
out.print("<input type=\"hidden\" name=\"datanodePort\" value=\""
+ datanodePort + "\">");
out.print("<input type=\"hidden\" name=\"namenodeInfoPort\" value=\""
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=1138160&r1=1138159&r2=1138160&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Tue Jun 21 20:09:54 2011
@@ -879,7 +879,8 @@ public class FSDataset implements FSCons
if (removedVols == null) {
removedVols = new ArrayList<FSVolume>(1);
}
- removedVols.add(volumeList.get(idx));
+ removedVols.add(fsv);
+ fsv.shutdown();
volumeList.set(idx, null); // Remove the volume
numFailedVolumes++;
}
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java?rev=1138160&r1=1138159&r2=1138160&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java Tue Jun 21 20:09:54 2011
@@ -189,7 +189,7 @@ public class BackupNode extends NameNode
@Override // NamenodeProtocol
public NamenodeRegistration register(NamenodeRegistration registration
) throws IOException {
- throw new UnsupportedActionException("journal");
+ throw new UnsupportedActionException("register");
}
@Override // NamenodeProtocol
Modified: hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java?rev=1138160&r1=1138159&r2=1138160&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java (original)
+++ hadoop/common/branches/HDFS-1073/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java Tue Jun 21 20:09:54 2011
@@ -570,6 +570,7 @@ public class BlockManager {
* dumps the contents of recentInvalidateSets
*/
private void dumpRecentInvalidateSets(PrintWriter out) {
+ assert namesystem.hasWriteLock();
int size = recentInvalidateSets.values().size();
out.println("Metasave: Blocks " + pendingDeletionBlocksCount
+ " waiting deletion from " + size + " datanodes.");
@@ -1258,7 +1259,7 @@ public class BlockManager {
// Ignore replicas already scheduled to be removed from the DN
if(belongsToInvalidates(dn.getStorageID(), block)) {
assert storedBlock.findDatanode(dn) < 0 : "Block " + block
- + " in recentInvalidatesSet should not appear in DN " + this;
+ + " in recentInvalidatesSet should not appear in DN " + dn;
return storedBlock;
}
@@ -1392,9 +1393,9 @@ public class BlockManager {
DatanodeDescriptor delNodeHint,
boolean logEveryBlock)
throws IOException {
- assert (block != null && namesystem.hasWriteLock());
+ assert block != null && namesystem.hasWriteLock();
BlockInfo storedBlock;
- if (block.getClass() == BlockInfoUnderConstruction.class) {
+ if (block instanceof BlockInfoUnderConstruction) {
//refresh our copy in case the block got completed in another thread
storedBlock = blocksMap.getStoredBlock(block);
} else {
@@ -1571,6 +1572,7 @@ public class BlockManager {
*/
void processOverReplicatedBlock(Block block, short replication,
DatanodeDescriptor addedNode, DatanodeDescriptor delNodeHint) {
+ assert namesystem.hasWriteLock();
if (addedNode == delNodeHint) {
delNodeHint = null;
}
@@ -1596,6 +1598,7 @@ public class BlockManager {
}
void addToExcessReplicate(DatanodeInfo dn, Block block) {
+ assert namesystem.hasWriteLock();
Collection<Block> excessBlocks = excessReplicateMap.get(dn.getStorageID());
if (excessBlocks == null) {
excessBlocks = new TreeSet<Block>();