You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2009/08/05 06:48:31 UTC
svn commit: r801057 - in /hadoop/hdfs/trunk: CHANGES.txt
src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
Author: szetszwo
Date: Wed Aug 5 04:48:30 2009
New Revision: 801057
URL: http://svn.apache.org/viewvc?rev=801057&view=rev
Log:
HDFS-524. Further DataTransferProtocol code refactoring.
Modified:
hadoop/hdfs/trunk/CHANGES.txt
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=801057&r1=801056&r2=801057&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Wed Aug 5 04:48:30 2009
@@ -18,6 +18,9 @@
HDFS-381. Remove blocks from DataNode maps when corresponding file
is deleted. (Suresh Srinivas via rangadi)
+ HDFS-377. Separate codes which implement DataTransferProtocol.
+ (szetszwo)
+
HDFS-396. NameNode image and edits directories are specified as URIs.
(Luca Telloli via rangadi)
@@ -47,9 +50,14 @@
only by the run-test-*-faul-inject targets. (Konstantin Boudnik via
szetszwo)
+ HDFS-446. Improvements to Offline Image Viewer. (Jakob Homan via shv)
+
HADOOP-6160. Fix releaseaudit target to run on specific directories.
(gkesavan)
+ HDFS-501. Use enum to define the constants in DataTransferProtocol.
+ (szetszwo)
+
HDFS-508. Factor out BlockInfo from BlocksMap. (shv)
HDFS-510. Rename DatanodeBlockInfo to be ReplicaInfo.
@@ -68,11 +76,11 @@
HDFS-504. Update the modification time of a file when the file
is closed. (Chun Zhang via dhruba)
- HDFS-446. Improvements to Offline Image Viewer. (Jakob Homan via shv)
-
HDFS-498. Add development guide and documentation for the fault injection
framework. (Konstantin Boudnik via szetszwo)
+ HDFS-524. Further DataTransferProtocol code refactoring. (szetszwo)
+
BUG FIXES
HDFS-76. Better error message to users when commands fail because of
lack of quota. Allow quota to be set even if the limit is lower than
@@ -81,9 +89,6 @@
HADOOP-4687. HDFS is split from Hadoop Core. It is a subproject under
Hadoop (Owen O'Malley)
- HDFS-377. Separate codes which implement DataTransferProtocol.
- (szetszwo)
-
HADOOP-6096. Fix Eclipse project and classpath files following project
split. (tomwhite)
@@ -120,9 +125,6 @@
HDFS-484. Fix bin-package and package target to package jar files.
(gkesavan)
- HDFS-501. Use enum to define the constants in DataTransferProtocol.
- (szetszwo)
-
HDFS-490. Eliminate the deprecated warnings introduced by H-5438.
(He Yongqiang via szetszwo)
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java?rev=801057&r1=801056&r2=801057&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java Wed Aug 5 04:48:30 2009
@@ -249,8 +249,8 @@
/** Receiver */
public static abstract class Receiver {
- /** Initialize a operation. */
- public final Op op(DataInputStream in) throws IOException {
+ /** Read an Op. It also checks protocol version. */
+ protected final Op readOp(DataInputStream in) throws IOException {
final short version = in.readShort();
if (version != DATA_TRANSFER_VERSION) {
throw new IOException( "Version Mismatch" );
@@ -258,8 +258,32 @@
return Op.read(in);
}
+ /** Process op by the corresponding method. */
+ protected final void processOp(Op op, DataInputStream in
+ ) throws IOException {
+ switch(op) {
+ case READ_BLOCK:
+ opReadBlock(in);
+ break;
+ case WRITE_BLOCK:
+ opWriteBlock(in);
+ break;
+ case REPLACE_BLOCK:
+ opReplaceBlock(in);
+ break;
+ case COPY_BLOCK:
+ opCopyBlock(in);
+ break;
+ case BLOCK_CHECKSUM:
+ opBlockChecksum(in);
+ break;
+ default:
+ throw new IOException("Unknown op " + op + " in data stream");
+ }
+ }
+
/** Receive OP_READ_BLOCK */
- public final void opReadBlock(DataInputStream in) throws IOException {
+ private void opReadBlock(DataInputStream in) throws IOException {
final long blockId = in.readLong();
final long blockGs = in.readLong();
final long offset = in.readLong();
@@ -270,13 +294,16 @@
opReadBlock(in, blockId, blockGs, offset, length, client, accesstoken);
}
- /** Abstract OP_READ_BLOCK method. */
- public abstract void opReadBlock(DataInputStream in,
+ /**
+ * Abstract OP_READ_BLOCK method.
+ * Read a block.
+ */
+ protected abstract void opReadBlock(DataInputStream in,
long blockId, long blockGs, long offset, long length,
String client, AccessToken accesstoken) throws IOException;
/** Receive OP_WRITE_BLOCK */
- public final void opWriteBlock(DataInputStream in) throws IOException {
+ private void opWriteBlock(DataInputStream in) throws IOException {
final long blockId = in.readLong();
final long blockGs = in.readLong();
final int pipelineSize = in.readInt(); // num of datanodes in entire pipeline
@@ -298,14 +325,17 @@
client, src, targets, accesstoken);
}
- /** Abstract OP_WRITE_BLOCK method. */
- public abstract void opWriteBlock(DataInputStream in,
+ /**
+ * Abstract OP_WRITE_BLOCK method.
+ * Write a block.
+ */
+ protected abstract void opWriteBlock(DataInputStream in,
long blockId, long blockGs, int pipelineSize, boolean isRecovery,
String client, DatanodeInfo src, DatanodeInfo[] targets,
AccessToken accesstoken) throws IOException;
/** Receive OP_REPLACE_BLOCK */
- public final void opReplaceBlock(DataInputStream in) throws IOException {
+ private void opReplaceBlock(DataInputStream in) throws IOException {
final long blockId = in.readLong();
final long blockGs = in.readLong();
final String sourceId = Text.readString(in); // read del hint
@@ -315,13 +345,16 @@
opReplaceBlock(in, blockId, blockGs, sourceId, src, accesstoken);
}
- /** Abstract OP_REPLACE_BLOCK method. */
- public abstract void opReplaceBlock(DataInputStream in,
+ /**
+ * Abstract OP_REPLACE_BLOCK method.
+ * It is used for balancing purpose; send to a destination
+ */
+ protected abstract void opReplaceBlock(DataInputStream in,
long blockId, long blockGs, String sourceId, DatanodeInfo src,
AccessToken accesstoken) throws IOException;
/** Receive OP_COPY_BLOCK */
- public final void opCopyBlock(DataInputStream in) throws IOException {
+ private void opCopyBlock(DataInputStream in) throws IOException {
final long blockId = in.readLong();
final long blockGs = in.readLong();
final AccessToken accesstoken = readAccessToken(in);
@@ -329,12 +362,15 @@
opCopyBlock(in, blockId, blockGs, accesstoken);
}
- /** Abstract OP_COPY_BLOCK method. */
- public abstract void opCopyBlock(DataInputStream in,
+ /**
+ * Abstract OP_COPY_BLOCK method.
+ * It is used for balancing purpose; send to a proxy source.
+ */
+ protected abstract void opCopyBlock(DataInputStream in,
long blockId, long blockGs, AccessToken accesstoken) throws IOException;
/** Receive OP_BLOCK_CHECKSUM */
- public final void opBlockChecksum(DataInputStream in) throws IOException {
+ private void opBlockChecksum(DataInputStream in) throws IOException {
final long blockId = in.readLong();
final long blockGs = in.readLong();
final AccessToken accesstoken = readAccessToken(in);
@@ -342,8 +378,11 @@
opBlockChecksum(in, blockId, blockGs, accesstoken);
}
- /** Abstract OP_BLOCK_CHECKSUM method. */
- public abstract void opBlockChecksum(DataInputStream in,
+ /**
+ * Abstract OP_BLOCK_CHECKSUM method.
+ * Get the checksum of a block
+ */
+ protected abstract void opBlockChecksum(DataInputStream in,
long blockId, long blockGs, AccessToken accesstoken) throws IOException;
/** Read an AccessToken */
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=801057&r1=801056&r2=801057&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Wed Aug 5 04:48:30 2009
@@ -43,6 +43,8 @@
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.metrics.util.MetricsTimeVaryingInt;
+import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.AccessToken;
import org.apache.hadoop.security.AccessTokenHandler;
@@ -57,22 +59,29 @@
public static final Log LOG = DataNode.LOG;
static final Log ClientTraceLog = DataNode.ClientTraceLog;
- Socket s;
- final String remoteAddress; // address of remote side
- final String localAddress; // local address of this daemon
- DataNode datanode;
- DataXceiverServer dataXceiverServer;
+ private final Socket s;
+ private final boolean isLocal; //is a local connection?
+ private final String remoteAddress; // address of remote side
+ private final String localAddress; // local address of this daemon
+ private final DataNode datanode;
+ private final DataXceiverServer dataXceiverServer;
+
+ private long opStartTime; //the start time of receiving an Op
public DataXceiver(Socket s, DataNode datanode,
DataXceiverServer dataXceiverServer) {
-
this.s = s;
+ this.isLocal = s.getInetAddress().equals(s.getLocalAddress());
this.datanode = datanode;
this.dataXceiverServer = dataXceiverServer;
dataXceiverServer.childSockets.put(s, s);
remoteAddress = s.getRemoteSocketAddress().toString();
localAddress = s.getLocalSocketAddress().toString();
- LOG.debug("Number of active connections is: " + datanode.getXceiverCount());
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Number of active connections is: "
+ + datanode.getXceiverCount());
+ }
}
/**
@@ -84,8 +93,8 @@
in = new DataInputStream(
new BufferedInputStream(NetUtils.getInputStream(s),
SMALL_BUFFER_SIZE));
- final DataTransferProtocol.Op op = op(in);
- boolean local = s.getInetAddress().equals(s.getLocalAddress());
+ final DataTransferProtocol.Op op = readOp(in);
+
// Make sure the xciver count is not exceeded
int curXceiverCount = datanode.getXceiverCount();
if (curXceiverCount > dataXceiverServer.maxXceiverCount) {
@@ -93,45 +102,16 @@
+ " exceeds the limit of concurrent xcievers "
+ dataXceiverServer.maxXceiverCount);
}
- long startTime = DataNode.now();
- switch ( op ) {
- case READ_BLOCK:
- opReadBlock(in);
- datanode.myMetrics.readBlockOp.inc(DataNode.now() - startTime);
- if (local)
- datanode.myMetrics.readsFromLocalClient.inc();
- else
- datanode.myMetrics.readsFromRemoteClient.inc();
- break;
- case WRITE_BLOCK:
- opWriteBlock(in);
- datanode.myMetrics.writeBlockOp.inc(DataNode.now() - startTime);
- if (local)
- datanode.myMetrics.writesFromLocalClient.inc();
- else
- datanode.myMetrics.writesFromRemoteClient.inc();
- break;
- case REPLACE_BLOCK: // for balancing purpose; send to a destination
- opReplaceBlock(in);
- datanode.myMetrics.replaceBlockOp.inc(DataNode.now() - startTime);
- break;
- case COPY_BLOCK:
- // for balancing purpose; send to a proxy source
- opCopyBlock(in);
- datanode.myMetrics.copyBlockOp.inc(DataNode.now() - startTime);
- break;
- case BLOCK_CHECKSUM: //get the checksum of a block
- opBlockChecksum(in);
- datanode.myMetrics.blockChecksumOp.inc(DataNode.now() - startTime);
- break;
- default:
- throw new IOException("Unknown opcode " + op + " in data stream");
- }
+
+ opStartTime = DataNode.now();
+ processOp(op, in);
} catch (Throwable t) {
LOG.error(datanode.dnRegistration + ":DataXceiver",t);
} finally {
- LOG.debug(datanode.dnRegistration + ":Number of active connections is: "
- + datanode.getXceiverCount());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(datanode.dnRegistration + ":Number of active connections is: "
+ + datanode.getXceiverCount());
+ }
IOUtils.closeStream(in);
IOUtils.closeSocket(s);
dataXceiverServer.childSockets.remove(s);
@@ -142,7 +122,7 @@
* Read a block from the disk.
*/
@Override
- public void opReadBlock(DataInputStream in,
+ protected void opReadBlock(DataInputStream in,
long blockId, long blockGs, long startOffset, long length,
String clientName, AccessToken accessToken) throws IOException {
final Block block = new Block(blockId, 0 , blockGs);
@@ -213,13 +193,18 @@
IOUtils.closeStream(out);
IOUtils.closeStream(blockSender);
}
+
+ //update metrics
+ updateDuration(datanode.myMetrics.readBlockOp);
+ updateCounter(datanode.myMetrics.readsFromLocalClient,
+ datanode.myMetrics.readsFromRemoteClient);
}
/**
* Write a block to disk.
*/
@Override
- public void opWriteBlock(DataInputStream in, long blockId, long blockGs,
+ protected void opWriteBlock(DataInputStream in, long blockId, long blockGs,
int pipelineSize, boolean isRecovery,
String client, DatanodeInfo srcDataNode, DatanodeInfo[] targets,
AccessToken accessToken) throws IOException {
@@ -377,13 +362,18 @@
IOUtils.closeSocket(mirrorSock);
IOUtils.closeStream(blockReceiver);
}
+
+ //update metrics
+ updateDuration(datanode.myMetrics.writeBlockOp);
+ updateCounter(datanode.myMetrics.writesFromLocalClient,
+ datanode.myMetrics.writesFromRemoteClient);
}
/**
* Get block checksum (MD5 of CRC32).
*/
@Override
- public void opBlockChecksum(DataInputStream in,
+ protected void opBlockChecksum(DataInputStream in,
long blockId, long blockGs, AccessToken accessToken) throws IOException {
final Block block = new Block(blockId, 0 , blockGs);
DataOutputStream out = new DataOutputStream(NetUtils.getOutputStream(s,
@@ -433,13 +423,16 @@
IOUtils.closeStream(checksumIn);
IOUtils.closeStream(metadataIn);
}
+
+ //update metrics
+ updateDuration(datanode.myMetrics.blockChecksumOp);
}
/**
* Read a block from the disk and then sends it to a destination.
*/
@Override
- public void opCopyBlock(DataInputStream in,
+ protected void opCopyBlock(DataInputStream in,
long blockId, long blockGs, AccessToken accessToken) throws IOException {
// Read in the header
Block block = new Block(blockId, 0, blockGs);
@@ -499,6 +492,9 @@
IOUtils.closeStream(reply);
IOUtils.closeStream(blockSender);
}
+
+ //update metrics
+ updateDuration(datanode.myMetrics.copyBlockOp);
}
/**
@@ -506,7 +502,7 @@
* remove the copy from the source.
*/
@Override
- public void opReplaceBlock(DataInputStream in,
+ protected void opReplaceBlock(DataInputStream in,
long blockId, long blockGs, String sourceID, DatanodeInfo proxySource,
AccessToken accessToken) throws IOException {
/* read header */
@@ -606,8 +602,20 @@
IOUtils.closeStream(blockReceiver);
IOUtils.closeStream(proxyReply);
}
+
+ //update metrics
+ updateDuration(datanode.myMetrics.replaceBlockOp);
}
-
+
+ private void updateDuration(MetricsTimeVaryingRate mtvr) {
+ mtvr.inc(DataNode.now() - opStartTime);
+ }
+
+ private void updateCounter(MetricsTimeVaryingInt localCounter,
+ MetricsTimeVaryingInt remoteCounter) {
+ (isLocal? localCounter: remoteCounter).inc();
+ }
+
/**
* Utility function for sending a response.
* @param s socket to write to