You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2009/08/05 06:48:31 UTC

svn commit: r801057 - in /hadoop/hdfs/trunk: CHANGES.txt src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java

Author: szetszwo
Date: Wed Aug  5 04:48:30 2009
New Revision: 801057

URL: http://svn.apache.org/viewvc?rev=801057&view=rev
Log:
HDFS-524. Further DataTransferProtocol code refactoring.

Modified:
    hadoop/hdfs/trunk/CHANGES.txt
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java

Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=801057&r1=801056&r2=801057&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Wed Aug  5 04:48:30 2009
@@ -18,6 +18,9 @@
     HDFS-381. Remove blocks from DataNode maps when corresponding file
     is deleted. (Suresh Srinivas via rangadi)
 
+    HDFS-377. Separate codes which implement DataTransferProtocol.
+    (szetszwo)
+
     HDFS-396. NameNode image and edits directories are specified as URIs.
     (Luca Telloli via rangadi)
 
@@ -47,9 +50,14 @@
     only by the run-test-*-faul-inject targets.  (Konstantin Boudnik via
     szetszwo)
 
+    HDFS-446. Improvements to Offline Image Viewer. (Jakob Homan via shv)
+
     HADOOP-6160. Fix releaseaudit target to run on specific directories.
     (gkesavan)
 
+    HDFS-501. Use enum to define the constants in DataTransferProtocol.
+    (szetszwo)
+
     HDFS-508. Factor out BlockInfo from BlocksMap. (shv)
 
     HDFS-510. Rename DatanodeBlockInfo to be ReplicaInfo.
@@ -68,11 +76,11 @@
     HDFS-504. Update the modification time of a file when the file 
     is closed. (Chun Zhang via dhruba)
 
-    HDFS-446. Improvements to Offline Image Viewer. (Jakob Homan via shv)
-
     HDFS-498. Add development guide and documentation for the fault injection
     framework.  (Konstantin Boudnik via szetszwo)
 
+    HDFS-524. Further DataTransferProtocol code refactoring.  (szetszwo)
+
   BUG FIXES
     HDFS-76. Better error message to users when commands fail because of 
     lack of quota. Allow quota to be set even if the limit is lower than
@@ -81,9 +89,6 @@
     HADOOP-4687. HDFS is split from Hadoop Core. It is a subproject under 
     Hadoop (Owen O'Malley)
 
-    HDFS-377. Separate codes which implement DataTransferProtocol.
-    (szetszwo)
-
     HADOOP-6096. Fix Eclipse project and classpath files following project
     split. (tomwhite)
 
@@ -120,9 +125,6 @@
     HDFS-484. Fix bin-package and package target to package jar files.
     (gkesavan)
 
-    HDFS-501. Use enum to define the constants in DataTransferProtocol.
-    (szetszwo)
-
     HDFS-490. Eliminate the deprecated warnings introduced by H-5438.
     (He Yongqiang via szetszwo)
 

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java?rev=801057&r1=801056&r2=801057&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java Wed Aug  5 04:48:30 2009
@@ -249,8 +249,8 @@
 
   /** Receiver */
   public static abstract class Receiver {
-    /** Initialize a operation. */
-    public final Op op(DataInputStream in) throws IOException {
+    /** Read an Op.  It also checks protocol version. */
+    protected final Op readOp(DataInputStream in) throws IOException {
       final short version = in.readShort();
       if (version != DATA_TRANSFER_VERSION) {
         throw new IOException( "Version Mismatch" );
@@ -258,8 +258,32 @@
       return Op.read(in);
     }
 
+    /** Process op by the corresponding method. */
+    protected final void processOp(Op op, DataInputStream in
+        ) throws IOException {
+      switch(op) {
+      case READ_BLOCK:
+        opReadBlock(in);
+        break;
+      case WRITE_BLOCK:
+        opWriteBlock(in);
+        break;
+      case REPLACE_BLOCK:
+        opReplaceBlock(in);
+        break;
+      case COPY_BLOCK:
+        opCopyBlock(in);
+        break;
+      case BLOCK_CHECKSUM:
+        opBlockChecksum(in);
+        break;
+      default:
+        throw new IOException("Unknown op " + op + " in data stream");
+      }
+    }
+
     /** Receive OP_READ_BLOCK */
-    public final void opReadBlock(DataInputStream in) throws IOException {
+    private void opReadBlock(DataInputStream in) throws IOException {
       final long blockId = in.readLong();          
       final long blockGs = in.readLong();
       final long offset = in.readLong();
@@ -270,13 +294,16 @@
       opReadBlock(in, blockId, blockGs, offset, length, client, accesstoken);
     }
 
-    /** Abstract OP_READ_BLOCK method. */
-    public abstract void opReadBlock(DataInputStream in,
+    /**
+     * Abstract OP_READ_BLOCK method.
+     * Read a block.
+     */
+    protected abstract void opReadBlock(DataInputStream in,
         long blockId, long blockGs, long offset, long length,
         String client, AccessToken accesstoken) throws IOException;
     
     /** Receive OP_WRITE_BLOCK */
-    public final void opWriteBlock(DataInputStream in) throws IOException {
+    private void opWriteBlock(DataInputStream in) throws IOException {
       final long blockId = in.readLong();          
       final long blockGs = in.readLong();
       final int pipelineSize = in.readInt(); // num of datanodes in entire pipeline
@@ -298,14 +325,17 @@
           client, src, targets, accesstoken);
     }
 
-    /** Abstract OP_WRITE_BLOCK method. */
-    public abstract void opWriteBlock(DataInputStream in,
+    /**
+     * Abstract OP_WRITE_BLOCK method. 
+     * Write a block.
+     */
+    protected abstract void opWriteBlock(DataInputStream in,
         long blockId, long blockGs, int pipelineSize, boolean isRecovery,
         String client, DatanodeInfo src, DatanodeInfo[] targets,
         AccessToken accesstoken) throws IOException;
 
     /** Receive OP_REPLACE_BLOCK */
-    public final void opReplaceBlock(DataInputStream in) throws IOException {
+    private void opReplaceBlock(DataInputStream in) throws IOException {
       final long blockId = in.readLong();          
       final long blockGs = in.readLong();
       final String sourceId = Text.readString(in); // read del hint
@@ -315,13 +345,16 @@
       opReplaceBlock(in, blockId, blockGs, sourceId, src, accesstoken);
     }
 
-    /** Abstract OP_REPLACE_BLOCK method. */
-    public abstract void opReplaceBlock(DataInputStream in,
+    /**
+     * Abstract OP_REPLACE_BLOCK method.
+     * It is used for balancing purpose; send to a destination
+     */
+    protected abstract void opReplaceBlock(DataInputStream in,
         long blockId, long blockGs, String sourceId, DatanodeInfo src,
         AccessToken accesstoken) throws IOException;
 
     /** Receive OP_COPY_BLOCK */
-    public final void opCopyBlock(DataInputStream in) throws IOException {
+    private void opCopyBlock(DataInputStream in) throws IOException {
       final long blockId = in.readLong();          
       final long blockGs = in.readLong();
       final AccessToken accesstoken = readAccessToken(in);
@@ -329,12 +362,15 @@
       opCopyBlock(in, blockId, blockGs, accesstoken);
     }
 
-    /** Abstract OP_COPY_BLOCK method. */
-    public abstract void opCopyBlock(DataInputStream in,
+    /**
+     * Abstract OP_COPY_BLOCK method.
+     * It is used for balancing purpose; send to a proxy source.
+     */
+    protected abstract void opCopyBlock(DataInputStream in,
         long blockId, long blockGs, AccessToken accesstoken) throws IOException;
 
     /** Receive OP_BLOCK_CHECKSUM */
-    public final void opBlockChecksum(DataInputStream in) throws IOException {
+    private void opBlockChecksum(DataInputStream in) throws IOException {
       final long blockId = in.readLong();          
       final long blockGs = in.readLong();
       final AccessToken accesstoken = readAccessToken(in);
@@ -342,8 +378,11 @@
       opBlockChecksum(in, blockId, blockGs, accesstoken);
     }
 
-    /** Abstract OP_BLOCK_CHECKSUM method. */
-    public abstract void opBlockChecksum(DataInputStream in,
+    /**
+     * Abstract OP_BLOCK_CHECKSUM method.
+     * Get the checksum of a block 
+     */
+    protected abstract void opBlockChecksum(DataInputStream in,
         long blockId, long blockGs, AccessToken accesstoken) throws IOException;
 
     /** Read an AccessToken */

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=801057&r1=801056&r2=801057&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Wed Aug  5 04:48:30 2009
@@ -43,6 +43,8 @@
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.MD5Hash;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.metrics.util.MetricsTimeVaryingInt;
+import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.AccessToken;
 import org.apache.hadoop.security.AccessTokenHandler;
@@ -57,22 +59,29 @@
   public static final Log LOG = DataNode.LOG;
   static final Log ClientTraceLog = DataNode.ClientTraceLog;
   
-  Socket s;
-  final String remoteAddress; // address of remote side
-  final String localAddress;  // local address of this daemon
-  DataNode datanode;
-  DataXceiverServer dataXceiverServer;
+  private final Socket s;
+  private final boolean isLocal; //is a local connection?
+  private final String remoteAddress; // address of remote side
+  private final String localAddress;  // local address of this daemon
+  private final DataNode datanode;
+  private final DataXceiverServer dataXceiverServer;
+
+  private long opStartTime; //the start time of receiving an Op
   
   public DataXceiver(Socket s, DataNode datanode, 
       DataXceiverServer dataXceiverServer) {
-    
     this.s = s;
+    this.isLocal = s.getInetAddress().equals(s.getLocalAddress());
     this.datanode = datanode;
     this.dataXceiverServer = dataXceiverServer;
     dataXceiverServer.childSockets.put(s, s);
     remoteAddress = s.getRemoteSocketAddress().toString();
     localAddress = s.getLocalSocketAddress().toString();
-    LOG.debug("Number of active connections is: " + datanode.getXceiverCount());
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Number of active connections is: "
+          + datanode.getXceiverCount());
+    }
   }
 
   /**
@@ -84,8 +93,8 @@
       in = new DataInputStream(
           new BufferedInputStream(NetUtils.getInputStream(s), 
                                   SMALL_BUFFER_SIZE));
-      final DataTransferProtocol.Op op = op(in);
-      boolean local = s.getInetAddress().equals(s.getLocalAddress());
+      final DataTransferProtocol.Op op = readOp(in);
+
       // Make sure the xciver count is not exceeded
       int curXceiverCount = datanode.getXceiverCount();
       if (curXceiverCount > dataXceiverServer.maxXceiverCount) {
@@ -93,45 +102,16 @@
                               + " exceeds the limit of concurrent xcievers "
                               + dataXceiverServer.maxXceiverCount);
       }
-      long startTime = DataNode.now();
-      switch ( op ) {
-      case READ_BLOCK:
-        opReadBlock(in);
-        datanode.myMetrics.readBlockOp.inc(DataNode.now() - startTime);
-        if (local)
-          datanode.myMetrics.readsFromLocalClient.inc();
-        else
-          datanode.myMetrics.readsFromRemoteClient.inc();
-        break;
-      case WRITE_BLOCK:
-        opWriteBlock(in);
-        datanode.myMetrics.writeBlockOp.inc(DataNode.now() - startTime);
-        if (local)
-          datanode.myMetrics.writesFromLocalClient.inc();
-        else
-          datanode.myMetrics.writesFromRemoteClient.inc();
-        break;
-      case REPLACE_BLOCK: // for balancing purpose; send to a destination
-        opReplaceBlock(in);
-        datanode.myMetrics.replaceBlockOp.inc(DataNode.now() - startTime);
-        break;
-      case COPY_BLOCK:
-            // for balancing purpose; send to a proxy source
-        opCopyBlock(in);
-        datanode.myMetrics.copyBlockOp.inc(DataNode.now() - startTime);
-        break;
-      case BLOCK_CHECKSUM: //get the checksum of a block
-        opBlockChecksum(in);
-        datanode.myMetrics.blockChecksumOp.inc(DataNode.now() - startTime);
-        break;
-      default:
-        throw new IOException("Unknown opcode " + op + " in data stream");
-      }
+
+      opStartTime = DataNode.now();
+      processOp(op, in);
     } catch (Throwable t) {
       LOG.error(datanode.dnRegistration + ":DataXceiver",t);
     } finally {
-      LOG.debug(datanode.dnRegistration + ":Number of active connections is: "
-                               + datanode.getXceiverCount());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(datanode.dnRegistration + ":Number of active connections is: "
+            + datanode.getXceiverCount());
+      }
       IOUtils.closeStream(in);
       IOUtils.closeSocket(s);
       dataXceiverServer.childSockets.remove(s);
@@ -142,7 +122,7 @@
    * Read a block from the disk.
    */
   @Override
-  public void opReadBlock(DataInputStream in,
+  protected void opReadBlock(DataInputStream in,
       long blockId, long blockGs, long startOffset, long length,
       String clientName, AccessToken accessToken) throws IOException {
     final Block block = new Block(blockId, 0 , blockGs);
@@ -213,13 +193,18 @@
       IOUtils.closeStream(out);
       IOUtils.closeStream(blockSender);
     }
+
+    //update metrics
+    updateDuration(datanode.myMetrics.readBlockOp);
+    updateCounter(datanode.myMetrics.readsFromLocalClient,
+                  datanode.myMetrics.readsFromRemoteClient);
   }
 
   /**
    * Write a block to disk.
    */
   @Override
-  public void opWriteBlock(DataInputStream in, long blockId, long blockGs,
+  protected void opWriteBlock(DataInputStream in, long blockId, long blockGs,
       int pipelineSize, boolean isRecovery,
       String client, DatanodeInfo srcDataNode, DatanodeInfo[] targets,
       AccessToken accessToken) throws IOException {
@@ -377,13 +362,18 @@
       IOUtils.closeSocket(mirrorSock);
       IOUtils.closeStream(blockReceiver);
     }
+
+    //update metrics
+    updateDuration(datanode.myMetrics.writeBlockOp);
+    updateCounter(datanode.myMetrics.writesFromLocalClient,
+                  datanode.myMetrics.writesFromRemoteClient);
   }
 
   /**
    * Get block checksum (MD5 of CRC32).
    */
   @Override
-  public void opBlockChecksum(DataInputStream in,
+  protected void opBlockChecksum(DataInputStream in,
       long blockId, long blockGs, AccessToken accessToken) throws IOException {
     final Block block = new Block(blockId, 0 , blockGs);
     DataOutputStream out = new DataOutputStream(NetUtils.getOutputStream(s,
@@ -433,13 +423,16 @@
       IOUtils.closeStream(checksumIn);
       IOUtils.closeStream(metadataIn);
     }
+
+    //update metrics
+    updateDuration(datanode.myMetrics.blockChecksumOp);
   }
 
   /**
    * Read a block from the disk and then sends it to a destination.
    */
   @Override
-  public void opCopyBlock(DataInputStream in,
+  protected void opCopyBlock(DataInputStream in,
       long blockId, long blockGs, AccessToken accessToken) throws IOException {
     // Read in the header
     Block block = new Block(blockId, 0, blockGs);
@@ -499,6 +492,9 @@
       IOUtils.closeStream(reply);
       IOUtils.closeStream(blockSender);
     }
+
+    //update metrics    
+    updateDuration(datanode.myMetrics.copyBlockOp);
   }
 
   /**
@@ -506,7 +502,7 @@
    * remove the copy from the source.
    */
   @Override
-  public void opReplaceBlock(DataInputStream in,
+  protected void opReplaceBlock(DataInputStream in,
       long blockId, long blockGs, String sourceID, DatanodeInfo proxySource,
       AccessToken accessToken) throws IOException {
     /* read header */
@@ -606,8 +602,20 @@
       IOUtils.closeStream(blockReceiver);
       IOUtils.closeStream(proxyReply);
     }
+
+    //update metrics
+    updateDuration(datanode.myMetrics.replaceBlockOp);
   }
-  
+
+  private void updateDuration(MetricsTimeVaryingRate mtvr) {
+    mtvr.inc(DataNode.now() - opStartTime);
+  }
+
+  private void updateCounter(MetricsTimeVaryingInt localCounter,
+      MetricsTimeVaryingInt remoteCounter) {
+    (isLocal? localCounter: remoteCounter).inc();
+  }
+
   /**
    * Utility function for sending a response.
    * @param s socket to write to