You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2011/06/11 02:36:13 UTC

svn commit: r1134492 [1/4] - in /hadoop/hdfs/trunk: ./ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/ src/java/org/apache/hadoop/hdfs/protocol/proto/ src/java/org/apache/hadoop/hdfs/server/balancer/ src/java/org/apache/hadoo...

Author: todd
Date: Sat Jun 11 00:36:12 2011
New Revision: 1134492

URL: http://svn.apache.org/viewvc?rev=1134492&view=rev
Log:
HDFS-2058. Change Data Transfer wire protocol to use protocol buffers. Contributed by Todd Lipcon.

Added:
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtoUtil.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/HdfsProtoUtil.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/proto/
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/proto/DataTransferProtos.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/proto/HdfsProtos.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/util/ByteBufferOutputStream.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/util/ExactSizeInputStream.java
    hadoop/hdfs/trunk/src/proto/
    hadoop/hdfs/trunk/src/proto/datatransfer.proto
    hadoop/hdfs/trunk/src/proto/hdfs.proto
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/util/TestExactSizeInputStream.java
Modified:
    hadoop/hdfs/trunk/CHANGES.txt
    hadoop/hdfs/trunk/build.xml
    hadoop/hdfs/trunk/ivy.xml
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/BlockReader.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
    hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj
    hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/DataTransferProtocolAspects.aj
    hadoop/hdfs/trunk/src/test/findbugsExcludeFile.xml
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestTransferRbw.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlockTokenWithDFS.java

Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=1134492&r1=1134491&r2=1134492&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Sat Jun 11 00:36:12 2011
@@ -285,6 +285,9 @@ Trunk (unreleased changes)
     layout to be consistent across the binary tgz, rpm, and deb.
     (Eric Yang via omalley)
 
+    HDFS-2058. Change Data Transfer wire protocol to use protocol buffers.
+    (todd)
+
   IMPROVEMENTS
 
     HDFS-1875. MiniDFSCluster hard-codes dfs.datanode.address to localhost

Modified: hadoop/hdfs/trunk/build.xml
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/build.xml?rev=1134492&r1=1134491&r2=1134492&view=diff
==============================================================================
--- hadoop/hdfs/trunk/build.xml (original)
+++ hadoop/hdfs/trunk/build.xml Sat Jun 11 00:36:12 2011
@@ -40,6 +40,7 @@
 
   <property name="src.dir" value="${basedir}/src"/>  	
   <property name="java.src.dir" value="${src.dir}/java"/>
+  <property name="proto.src.dir" value="${src.dir}/proto"/>
   <property name="anttasks.dir" value="${basedir}/src/ant"/>
   <property name="lib.dir" value="${basedir}/lib"/>
   <property name="conf.dir" value="${basedir}/conf"/>
@@ -201,6 +202,9 @@
   <property name="build.dir.eclipse-test-classes" value="${build.dir.eclipse}/classes-test"/>
   <property name="build.dir.eclipse-contrib-classes" value="${build.dir.eclipse}/classes-contrib"/>
 
+  <!-- Protoc properties -->
+  <property name="protoc" value="protoc" />
+
   <property name="clover.jar" location="${clover.home}/lib/clover.jar"/>
   <available property="clover.present" file="${clover.jar}" />
 
@@ -923,7 +927,9 @@
       bottom="Copyright &amp;copy; ${year} The Apache Software Foundation"
       maxmemory="${javadoc.maxmemory}">
 
-        <packageset dir="${java.src.dir}"/>
+        <packageset dir="${java.src.dir}">
+          <exclude name="org/apache/hadoop/hdfs/protocol/proto" />
+        </packageset>
         <link href="${javadoc.link.java}"/>
         <classpath >
           <path refid="classpath" />
@@ -943,7 +949,9 @@
          <param name="-apidir" value="${jdiff.xml.dir}"/>
          <param name="-apiname" value="hadoop-hdfs ${version}"/>
        </doclet>
-       <packageset dir="src/java"/>
+       <packageset dir="${java.src.dir}">
+         <exclude name="org/apache/hadoop/hdfs/protocol/proto" />
+       </packageset>
        <classpath >
          <path refid="classpath" />
          <path refid="jdiff-classpath" />
@@ -1365,6 +1373,8 @@
         <exclude name="lib/jdiff/"/>
         <exclude name="**/conf/*" />
         <exclude name="webapps/**/WEB-INF/web.xml"/>
+        <!-- generated code for protobufs don't have headers -->
+        <exclude name="src/java/org/apache/hadoop/hdfs/protocol/proto/*Protos.java" />
         <exclude name="src/docs/releasenotes.html" />
         <exclude name="src/test/hdfs/org/apache/hadoop/cli/clitest_data/" />
         <exclude name="src/test/hdfs/org/apache/hadoop/hdfs/tools/offlineEditsViewer/editsStored*" />
@@ -1672,6 +1682,27 @@
     </copy>
   </target>
 
+  <target name="generate-protos"
+    description="Generate Java code from protocol buffer definitions">
+    <exec executable="bash" resultproperty="which.protoc.result" outputproperty="">
+      <arg value="-c" />
+      <arg value="which ${protoc}" />
+    </exec>
+    <condition property="protoc.found">
+      <equals arg1="${which.protoc.result}" arg2="0" />
+    </condition>
+    <fail unless="protoc.found"
+      message="No protoc compiler found. Please pass -Dprotoc=/path/to/protoc if it is not on your path." />
+
+    <exec executable="${protoc}" failonerror="true">
+      <arg value="--java_out=${java.src.dir}" />
+      <arg value="--proto_path=${proto.src.dir}" />
+      <arg value="${proto.src.dir}/hdfs.proto" />
+      <arg value="${proto.src.dir}/datatransfer.proto" />
+    </exec>
+    <echo message="Generated protocol buffer code successfully." />
+  </target>
+
   <target name="ivy-init-dirs">
     <mkdir dir="${build.ivy.dir}" />
     <mkdir dir="${build.ivy.lib.dir}" />

Modified: hadoop/hdfs/trunk/ivy.xml
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/ivy.xml?rev=1134492&r1=1134491&r2=1134492&view=diff
==============================================================================
--- hadoop/hdfs/trunk/ivy.xml (original)
+++ hadoop/hdfs/trunk/ivy.xml Sat Jun 11 00:36:12 2011
@@ -67,6 +67,7 @@
     <dependency org="commons-logging" name="commons-logging" rev="${commons-logging.version}" conf="compile->master"/>
     <dependency org="commons-daemon" name="commons-daemon" rev="${commons-daemon.version}" conf="hdfs->default" />
     <dependency org="log4j" name="log4j" rev="${log4j.version}" conf="common->master"/>
+    <dependency org="com.google.protobuf" name="protobuf-java" rev="2.4.0a" conf="common->master"/>
     <dependency org="org.apache.hadoop" name="avro" rev="${avro.version}" conf="compile->master">
       <exclude module="ant"/>
       <exclude module="jetty"/>

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/BlockReader.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/BlockReader.java?rev=1134492&r1=1134491&r2=1134492&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/BlockReader.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/BlockReader.java Sat Jun 11 00:36:12 2011
@@ -17,10 +17,6 @@
  */
 package org.apache.hadoop.hdfs;
 
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.CHECKSUM_OK;
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.ERROR_ACCESS_TOKEN;
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.SUCCESS;
-
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
 import java.io.DataInputStream;
@@ -37,6 +33,10 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PacketHeader;
+import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
@@ -45,6 +45,9 @@ import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.DataChecksum;
 
+import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.vintPrefixed;
+
+
 /** This is a wrapper around connection to datanode
  * and understands checksum, offset etc.
  *
@@ -138,9 +141,9 @@ public class BlockReader extends FSInput
     // if eos was set in the previous read, send a status code to the DN
     if (eos && !eosBefore && nRead >= 0) {
       if (needChecksum()) {
-        sendReadResult(dnSock, CHECKSUM_OK);
+        sendReadResult(dnSock, Status.CHECKSUM_OK);
       } else {
-        sendReadResult(dnSock, SUCCESS);
+        sendReadResult(dnSock, Status.SUCCESS);
       }
     }
     return nRead;
@@ -313,20 +316,13 @@ public class BlockReader extends FSInput
         pos + bytesToRead >= bytesNeededToFinish) {
 
       // Read header
-      int packetLen = in.readInt();
-      long offsetInBlock = in.readLong();
-      long seqno = in.readLong();
-      boolean lastPacketInBlock = in.readBoolean();
-      int dataLen = in.readInt();
+      PacketHeader hdr = new PacketHeader();
+      hdr.readFields(in);
 
-      if (!lastPacketInBlock ||
-          dataLen != 0) {
+      if (!hdr.isLastPacketInBlock() ||
+          hdr.getDataLen() != 0) {
         throw new IOException("Expected empty end-of-read packet! Header: " +
-                              "(packetLen : " + packetLen + 
-                              ", offsetInBlock : " + offsetInBlock +
-                              ", seqno : " + seqno + 
-                              ", lastInBlock : " + lastPacketInBlock +
-                              ", dataLen : " + dataLen);
+                              hdr);
       }
 
       eos = true;
@@ -422,9 +418,10 @@ public class BlockReader extends FSInput
         new BufferedInputStream(NetUtils.getInputStream(sock), 
                                 bufferSize));
     
-    DataTransferProtocol.Status status = DataTransferProtocol.Status.read(in);
-    if (status != SUCCESS) {
-      if (status == ERROR_ACCESS_TOKEN) {
+    BlockOpResponseProto status = BlockOpResponseProto.parseFrom(
+        vintPrefixed(in));
+    if (status.getStatus() != Status.SUCCESS) {
+      if (status.getStatus() == Status.ERROR_ACCESS_TOKEN) {
         throw new InvalidBlockTokenException(
             "Got access token error for OP_READ_BLOCK, self="
                 + sock.getLocalSocketAddress() + ", remote="
@@ -499,11 +496,16 @@ public class BlockReader extends FSInput
    * closing our connection (which we will re-open), but won't affect
    * data correctness.
    */
-  void sendReadResult(Socket sock, DataTransferProtocol.Status statusCode) {
+  void sendReadResult(Socket sock, Status statusCode) {
     assert !sentStatusCode : "already sent status code to " + sock;
     try {
       OutputStream out = NetUtils.getOutputStream(sock, HdfsConstants.WRITE_TIMEOUT);
-      statusCode.writeOutputStream(out);
+      
+      ClientReadStatusProto.newBuilder()
+        .setStatus(statusCode)
+        .build()
+        .writeDelimitedTo(out);
+
       out.flush();
       sentStatusCode = true;
     } catch (IOException e) {

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1134492&r1=1134491&r2=1134492&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java Sat Jun 11 00:36:12 2011
@@ -19,8 +19,6 @@
 package org.apache.hadoop.hdfs;
 
 import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Op.BLOCK_CHECKSUM;
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.ERROR_ACCESS_TOKEN;
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.SUCCESS;
 
 import java.io.BufferedOutputStream;
 import java.io.ByteArrayInputStream;
@@ -75,7 +73,11 @@ import org.apache.hadoop.hdfs.protocol.H
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
+import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
 import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
@@ -1118,9 +1120,11 @@ public class DFSClient implements FSCons
           DataTransferProtocol.Sender.opBlockChecksum(out, block,
               lb.getBlockToken());
 
-          final DataTransferProtocol.Status reply = DataTransferProtocol.Status.read(in);
-          if (reply != SUCCESS) {
-            if (reply == ERROR_ACCESS_TOKEN
+          final BlockOpResponseProto reply =
+            BlockOpResponseProto.parseFrom(HdfsProtoUtil.vintPrefixed(in));
+
+          if (reply.getStatus() != Status.SUCCESS) {
+            if (reply.getStatus() == Status.ERROR_ACCESS_TOKEN
                 && i > lastRetriedIndex) {
               if (LOG.isDebugEnabled()) {
                 LOG.debug("Got access token error in response to OP_BLOCK_CHECKSUM "
@@ -1138,9 +1142,12 @@ public class DFSClient implements FSCons
                   + block + " from datanode " + datanodes[j].getName());
             }
           }
+          
+          OpBlockChecksumResponseProto checksumData =
+            reply.getChecksumResponse();
 
           //read byte-per-checksum
-          final int bpc = in.readInt(); 
+          final int bpc = checksumData.getBytesPerCrc();
           if (i == 0) { //first block
             bytesPerCRC = bpc;
           }
@@ -1150,13 +1157,14 @@ public class DFSClient implements FSCons
           }
           
           //read crc-per-block
-          final long cpb = in.readLong();
+          final long cpb = checksumData.getCrcPerBlock();
           if (locatedblocks.size() > 1 && i == 0) {
             crcPerBlock = cpb;
           }
 
           //read md5
-          final MD5Hash md5 = MD5Hash.read(in);
+          final MD5Hash md5 = new MD5Hash(
+              checksumData.getMd5().toByteArray());
           md5.write(md5out);
           
           done = true;

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java?rev=1134492&r1=1134491&r2=1134492&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java Sat Jun 11 00:36:12 2011
@@ -17,8 +17,9 @@
  */
 package org.apache.hadoop.hdfs;
 
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.ERROR_ACCESS_TOKEN;
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.SUCCESS;
+import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.SUCCESS;
+import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR;
+
 
 import java.io.BufferedOutputStream;
 import java.io.DataInputStream;
@@ -56,7 +57,10 @@ import org.apache.hadoop.hdfs.protocol.E
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
+import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
 import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
@@ -75,6 +79,8 @@ import org.apache.hadoop.util.Progressab
 import org.apache.hadoop.util.PureJavaCrc32;
 import org.apache.hadoop.util.StringUtils;
 
+
+
 /****************************************************************
  * DFSOutputStream creates files from a stream of bytes.
  *
@@ -650,7 +656,7 @@ class DFSOutputStream extends FSOutputSu
             long seqno = ack.getSeqno();
             // processes response status from datanodes.
             for (int i = ack.getNumOfReplies()-1; i >=0  && dfsClient.clientRunning; i--) {
-              final DataTransferProtocol.Status reply = ack.getReply(i);
+              final Status reply = ack.getReply(i);
               if (reply != SUCCESS) {
                 errorIndex = i; // first bad datanode
                 throw new IOException("Bad response " + reply +
@@ -848,7 +854,9 @@ class DFSOutputStream extends FSOutputSu
 
         //ack
         in = new DataInputStream(NetUtils.getInputStream(sock));
-        if (SUCCESS != DataTransferProtocol.Status.read(in)) {
+        BlockOpResponseProto response =
+          BlockOpResponseProto.parseFrom(HdfsProtoUtil.vintPrefixed(in));
+        if (SUCCESS != response.getStatus()) {
           throw new IOException("Failed to add a datanode");
         }
       } finally {
@@ -990,7 +998,7 @@ class DFSOutputStream extends FSOutputSu
     //
     private boolean createBlockOutputStream(DatanodeInfo[] nodes, long newGS,
         boolean recoveryFlag) {
-      DataTransferProtocol.Status pipelineStatus = SUCCESS;
+      Status pipelineStatus = SUCCESS;
       String firstBadLink = "";
       if (DFSClient.LOG.isDebugEnabled()) {
         for (int i = 0; i < nodes.length; i++) {
@@ -1023,10 +1031,13 @@ class DFSOutputStream extends FSOutputSu
         out.flush();
 
         // receive ack for connect
-        pipelineStatus = DataTransferProtocol.Status.read(blockReplyStream);
-        firstBadLink = Text.readString(blockReplyStream);
+        BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
+            HdfsProtoUtil.vintPrefixed(blockReplyStream));
+        pipelineStatus = resp.getStatus();
+        firstBadLink = resp.getFirstBadLink();
+        
         if (pipelineStatus != SUCCESS) {
-          if (pipelineStatus == ERROR_ACCESS_TOKEN) {
+          if (pipelineStatus == Status.ERROR_ACCESS_TOKEN) {
             throw new InvalidBlockTokenException(
                 "Got access token error for connect ack with firstBadLink as "
                     + firstBadLink);

Added: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtoUtil.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtoUtil.java?rev=1134492&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtoUtil.java (added)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtoUtil.java Sat Jun 11 00:36:12 2011
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol;
+
+
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BaseHeaderProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.security.token.Token;
+
+
+/**
+ * Static utilities for dealing with the protocol buffers used by the
+ * Data Transfer Protocol.
+ */
+abstract class DataTransferProtoUtil {
+
+  static DataTransferProtocol.BlockConstructionStage fromProto(
+      OpWriteBlockProto.BlockConstructionStage stage) {
+    return BlockConstructionStage.valueOf(BlockConstructionStage.class,
+        stage.name());
+  }
+
+  static OpWriteBlockProto.BlockConstructionStage toProto(
+      BlockConstructionStage stage) {
+    return OpWriteBlockProto.BlockConstructionStage.valueOf(
+        stage.name());
+  }
+
+  static ClientOperationHeaderProto buildClientHeader(ExtendedBlock blk,
+      String client, Token<BlockTokenIdentifier> blockToken) {
+    ClientOperationHeaderProto header =
+      ClientOperationHeaderProto.newBuilder()
+        .setBaseHeader(buildBaseHeader(blk, blockToken))
+        .setClientName(client)
+        .build();
+    return header;
+  }
+
+  static BaseHeaderProto buildBaseHeader(ExtendedBlock blk,
+      Token<BlockTokenIdentifier> blockToken) {
+    return BaseHeaderProto.newBuilder()
+      .setBlock(HdfsProtoUtil.toProto(blk))
+      .setToken(HdfsProtoUtil.toProto(blockToken))
+      .build();
+  }
+}

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java?rev=1134492&r1=1134491&r2=1134492&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java Sat Jun 11 00:36:12 2011
@@ -22,8 +22,10 @@ import java.io.DataInputStream;
 import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
+import java.util.Arrays;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -32,12 +34,31 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpTransferBlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PacketHeaderProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.hdfs.util.ByteBufferOutputStream;
 import org.apache.hadoop.security.token.Token;
 
+import static org.apache.hadoop.hdfs.protocol.DataTransferProtoUtil.fromProto;
+import static org.apache.hadoop.hdfs.protocol.DataTransferProtoUtil.toProto;
+import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.fromProto;
+import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.fromProtos;
+import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.toProto;
+import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.toProtos;
+import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.vintPrefixed;
+
+import com.google.protobuf.Message;
+
 /**
  * Transfer data to/from datanode using a streaming protocol.
  */
@@ -89,373 +110,8 @@ public interface DataTransferProtocol {
     public void write(DataOutput out) throws IOException {
       out.write(code);
     }
-
-    /** Base class for all headers. */
-    private static abstract class BaseHeader implements Writable {
-      private ExtendedBlock block;
-      private Token<BlockTokenIdentifier> blockToken;
-      
-      private BaseHeader() {}
-      
-      private BaseHeader(
-          final ExtendedBlock block,
-          final Token<BlockTokenIdentifier> blockToken) {
-        this.block = block;
-        this.blockToken = blockToken;
-      }
-
-      /** @return the extended block. */
-      public final ExtendedBlock getBlock() {
-        return block;
-      }
-
-      /** @return the block token. */
-      public final Token<BlockTokenIdentifier> getBlockToken() {
-        return blockToken;
-      }
-
-      @Override
-      public void write(DataOutput out) throws IOException {
-        block.writeId(out);
-        blockToken.write(out);
-      }
-
-      @Override
-      public void readFields(DataInput in) throws IOException {
-        block = new ExtendedBlock();
-        block.readId(in);
-
-        blockToken = new Token<BlockTokenIdentifier>();
-        blockToken.readFields(in);
-      }
-    }
-
-    /** Base header for all client operation. */
-    private static abstract class ClientOperationHeader extends BaseHeader {
-      private String clientName;
-      
-      private ClientOperationHeader() {}
-      
-      private ClientOperationHeader(
-          final ExtendedBlock block,
-          final Token<BlockTokenIdentifier> blockToken,
-          final String clientName) {
-        super(block, blockToken);
-        this.clientName = clientName;
-      }
-
-      /** @return client name. */
-      public final String getClientName() {
-        return clientName;
-      }
-
-      @Override
-      public void write(DataOutput out) throws IOException {
-        super.write(out);
-        Text.writeString(out, clientName);
-      }
-
-      @Override
-      public void readFields(DataInput in) throws IOException {
-        super.readFields(in);
-        clientName = Text.readString(in);
-      }
-    }
-
-    /** {@link Op#READ_BLOCK} header. */
-    public static class ReadBlockHeader extends ClientOperationHeader {
-      private long offset;
-      private long length;
-
-      /** Default constructor */
-      public ReadBlockHeader() {}
-
-      /** Constructor with all parameters */
-      public ReadBlockHeader(
-          final ExtendedBlock blk,
-          final Token<BlockTokenIdentifier> blockToken,
-          final String clientName,
-          final long offset,
-          final long length) {
-        super(blk, blockToken, clientName);
-        this.offset = offset;
-        this.length = length;
-      }
-
-      /** @return the offset */
-      public long getOffset() {
-        return offset;
-      }
-
-      /** @return the length */
-      public long getLength() {
-        return length;
-      }
-
-      @Override
-      public void write(DataOutput out) throws IOException {
-        super.write(out);
-        out.writeLong(offset);
-        out.writeLong(length);
-      }
-
-      @Override
-      public void readFields(DataInput in) throws IOException {
-        super.readFields(in);
-        offset = in.readLong();
-        length = in.readLong();
-      }
-    }
-
-    /** {@link Op#WRITE_BLOCK} header. */
-    public static class WriteBlockHeader extends ClientOperationHeader {
-      private DatanodeInfo[] targets;
-
-      private DatanodeInfo source;
-      private BlockConstructionStage stage;
-      private int pipelineSize;
-      private long minBytesRcvd;
-      private long maxBytesRcvd;
-      private long latestGenerationStamp;
-      
-      /** Default constructor */
-      public WriteBlockHeader() {}
-
-      /** Constructor with all parameters */
-      public WriteBlockHeader(
-          final ExtendedBlock blk,
-          final Token<BlockTokenIdentifier> blockToken,
-          final String clientName,
-          final DatanodeInfo[] targets,
-          final DatanodeInfo source,
-          final BlockConstructionStage stage,
-          final int pipelineSize,
-          final long minBytesRcvd,
-          final long maxBytesRcvd,
-          final long latestGenerationStamp
-          ) throws IOException {
-        super(blk, blockToken, clientName);
-        this.targets = targets;
-        this.source = source;
-        this.stage = stage;
-        this.pipelineSize = pipelineSize;
-        this.minBytesRcvd = minBytesRcvd;
-        this.maxBytesRcvd = maxBytesRcvd;
-        this.latestGenerationStamp = latestGenerationStamp;
-      }
-
-      /** @return targets. */
-      public DatanodeInfo[] getTargets() {
-        return targets;
-      }
-
-      /** @return the source */
-      public DatanodeInfo getSource() {
-        return source;
-      }
-
-      /** @return the stage */
-      public BlockConstructionStage getStage() {
-        return stage;
-      }
-
-      /** @return the pipeline size */
-      public int getPipelineSize() {
-        return pipelineSize;
-      }
-
-      /** @return the minimum bytes received. */
-      public long getMinBytesRcvd() {
-        return minBytesRcvd;
-      }
-
-      /** @return the maximum bytes received. */
-      public long getMaxBytesRcvd() {
-        return maxBytesRcvd;
-      }
-
-      /** @return the latest generation stamp */
-      public long getLatestGenerationStamp() {
-        return latestGenerationStamp;
-      }
-
-      @Override
-      public void write(DataOutput out) throws IOException {
-        super.write(out);
-        Sender.write(out, 1, targets);
-
-        out.writeBoolean(source != null);
-        if (source != null) {
-          source.write(out);
-        }
-
-        stage.write(out);
-        out.writeInt(pipelineSize);
-        WritableUtils.writeVLong(out, minBytesRcvd);
-        WritableUtils.writeVLong(out, maxBytesRcvd);
-        WritableUtils.writeVLong(out, latestGenerationStamp);
-      }
-
-      @Override
-      public void readFields(DataInput in) throws IOException {
-        super.readFields(in);
-        targets = Receiver.readDatanodeInfos(in);
-
-        source = in.readBoolean()? DatanodeInfo.read(in): null;
-        stage = BlockConstructionStage.readFields(in);
-        pipelineSize = in.readInt(); // num of datanodes in entire pipeline
-        minBytesRcvd = WritableUtils.readVLong(in);
-        maxBytesRcvd = WritableUtils.readVLong(in);
-        latestGenerationStamp = WritableUtils.readVLong(in);
-      }
-    }
-
-    /** {@link Op#TRANSFER_BLOCK} header. */
-    public static class TransferBlockHeader extends ClientOperationHeader {
-      private DatanodeInfo[] targets;
-
-      /** Default constructor */
-      public TransferBlockHeader() {}
-
-      /** Constructor with all parameters */
-      public TransferBlockHeader(
-          final ExtendedBlock blk,
-          final Token<BlockTokenIdentifier> blockToken,
-          final String clientName,
-          final DatanodeInfo[] targets) throws IOException {
-        super(blk, blockToken, clientName);
-        this.targets = targets;
-      }
-
-      /** @return targets. */
-      public DatanodeInfo[] getTargets() {
-        return targets;
-      }
-
-      @Override
-      public void write(DataOutput out) throws IOException {
-        super.write(out);
-        Sender.write(out, 0, targets);
-      }
-
-      @Override
-      public void readFields(DataInput in) throws IOException {
-        super.readFields(in);
-        targets = Receiver.readDatanodeInfos(in);
-      }
-    }
-
-    /** {@link Op#REPLACE_BLOCK} header. */
-    public static class ReplaceBlockHeader extends BaseHeader {
-      private String delHint;
-      private DatanodeInfo source;
-
-      /** Default constructor */
-      public ReplaceBlockHeader() {}
-
-      /** Constructor with all parameters */
-      public ReplaceBlockHeader(final ExtendedBlock blk,
-          final Token<BlockTokenIdentifier> blockToken,
-          final String storageId,
-          final DatanodeInfo src) throws IOException {
-        super(blk, blockToken);
-        this.delHint = storageId;
-        this.source = src;
-      }
-
-      /** @return delete-hint. */
-      public String getDelHint() {
-        return delHint;
-      }
-
-      /** @return source datanode. */
-      public DatanodeInfo getSource() {
-        return source;
-      }
-
-      @Override
-      public void write(DataOutput out) throws IOException {
-        super.write(out);
-        Text.writeString(out, delHint);
-        source.write(out);
-      }
-
-      @Override
-      public void readFields(DataInput in) throws IOException {
-        super.readFields(in);
-        delHint = Text.readString(in);
-        source = DatanodeInfo.read(in);
-      }
-    }
-
-    /** {@link Op#COPY_BLOCK} header. */
-    public static class CopyBlockHeader extends BaseHeader {
-      /** Default constructor */
-      public CopyBlockHeader() {}
-
-      /** Constructor with all parameters */
-      public CopyBlockHeader(
-          final ExtendedBlock block,
-          final Token<BlockTokenIdentifier> blockToken) {
-        super(block, blockToken);
-      }
-    }
-
-    /** {@link Op#BLOCK_CHECKSUM} header. */
-    public static class BlockChecksumHeader extends BaseHeader {
-      /** Default constructor */
-      public BlockChecksumHeader() {}
-
-      /** Constructor with all parameters */
-      public BlockChecksumHeader(
-          final ExtendedBlock block,
-          final Token<BlockTokenIdentifier> blockToken) {
-        super(block, blockToken);
-      }
-    }
   }
-
-
-  /** Status */
-  public enum Status {
-    SUCCESS(0),
-    ERROR(1),
-    ERROR_CHECKSUM(2),
-    ERROR_INVALID(3),
-    ERROR_EXISTS(4),
-    ERROR_ACCESS_TOKEN(5),
-    CHECKSUM_OK(6);
-
-    /** The code for this operation. */
-    private final int code;
     
-    private Status(int code) {
-      this.code = code;
-    }
-
-    private static final int FIRST_CODE = values()[0].code;
-    /** Return the object represented by the code. */
-    private static Status valueOf(int code) {
-      final int i = code - FIRST_CODE;
-      return i < 0 || i >= values().length? null: values()[i];
-    }
-
-    /** Read from in */
-    public static Status read(DataInput in) throws IOException {
-      return valueOf(in.readShort());
-    }
-
-    /** Write to out */
-    public void write(DataOutput out) throws IOException {
-      out.writeShort(code);
-    }
-
-    /** Write to out */
-    public void writeOutputStream(OutputStream out) throws IOException {
-      out.write(new byte[] {(byte)(code >>> 8), (byte)code});
-    }
-  };
-  
   public enum BlockConstructionStage {
     /** The enumerates are always listed as regular stage followed by the
      * recovery stage. 
@@ -492,23 +148,9 @@ public interface DataTransferProtocol {
         return values()[ordinal()|RECOVERY_BIT];
       }
     }
-    
-    private static BlockConstructionStage valueOf(byte code) {
-      return code < 0 || code >= values().length? null: values()[code];
-    }
-    
-    /** Read from in */
-    private static BlockConstructionStage readFields(DataInput in)
-    throws IOException {
-      return valueOf(in.readByte());
-    }
-
-    /** write to out */
-    private void write(DataOutput out) throws IOException {
-      out.writeByte(ordinal());
-    }
   }    
 
+  
   /** Sender */
   @InterfaceAudience.Private
   @InterfaceStability.Evolving
@@ -520,11 +162,10 @@ public interface DataTransferProtocol {
       op.write(out);
     }
 
-    /** Send an operation request. */
     private static void send(final DataOutputStream out, final Op opcode,
-        final Op.BaseHeader parameters) throws IOException {
+        final Message proto) throws IOException {
       op(out, opcode);
-      parameters.write(out);
+      proto.writeDelimitedTo(out);
       out.flush();
     }
 
@@ -533,59 +174,90 @@ public interface DataTransferProtocol {
         long blockOffset, long blockLen, String clientName,
         Token<BlockTokenIdentifier> blockToken)
         throws IOException {
-      send(out, Op.READ_BLOCK, new Op.ReadBlockHeader(blk, blockToken,
-          clientName, blockOffset, blockLen));
+
+      OpReadBlockProto proto = OpReadBlockProto.newBuilder()
+        .setHeader(DataTransferProtoUtil.buildClientHeader(blk, clientName, blockToken))
+        .setOffset(blockOffset)
+        .setLen(blockLen)
+        .build();
+
+      send(out, Op.READ_BLOCK, proto);
     }
     
+
     /** Send OP_WRITE_BLOCK */
     public static void opWriteBlock(DataOutputStream out, ExtendedBlock blk,
         int pipelineSize, BlockConstructionStage stage, long newGs,
         long minBytesRcvd, long maxBytesRcvd, String client, DatanodeInfo src,
         DatanodeInfo[] targets, Token<BlockTokenIdentifier> blockToken)
         throws IOException {
-      send(out, Op.WRITE_BLOCK, new Op.WriteBlockHeader(blk, blockToken,
-          client, targets, src, stage, pipelineSize, minBytesRcvd, maxBytesRcvd,
-          newGs));
+      ClientOperationHeaderProto header = DataTransferProtoUtil.buildClientHeader(blk, client,
+          blockToken);
+      
+      OpWriteBlockProto.Builder proto = OpWriteBlockProto.newBuilder()
+        .setHeader(header)
+        .addAllTargets(
+            toProtos(targets, 1))
+        .setStage(toProto(stage))
+        .setPipelineSize(pipelineSize)
+        .setMinBytesRcvd(minBytesRcvd)
+        .setMaxBytesRcvd(maxBytesRcvd)
+        .setLatestGenerationStamp(newGs);
+      
+      if (src != null) {
+        proto.setSource(toProto(src));
+      }
+
+      send(out, Op.WRITE_BLOCK, proto.build());
     }
 
     /** Send {@link Op#TRANSFER_BLOCK} */
     public static void opTransferBlock(DataOutputStream out, ExtendedBlock blk,
         String client, DatanodeInfo[] targets,
         Token<BlockTokenIdentifier> blockToken) throws IOException {
-      send(out, Op.TRANSFER_BLOCK, new Op.TransferBlockHeader(blk, blockToken,
-          client, targets));
+      
+      OpTransferBlockProto proto = OpTransferBlockProto.newBuilder()
+        .setHeader(DataTransferProtoUtil.buildClientHeader(
+            blk, client, blockToken))
+        .addAllTargets(toProtos(targets, 0))
+        .build();
+
+      send(out, Op.TRANSFER_BLOCK, proto);
     }
 
     /** Send OP_REPLACE_BLOCK */
     public static void opReplaceBlock(DataOutputStream out,
         ExtendedBlock blk, String delHint, DatanodeInfo src,
         Token<BlockTokenIdentifier> blockToken) throws IOException {
-      send(out, Op.REPLACE_BLOCK, new Op.ReplaceBlockHeader(blk, blockToken,
-          delHint, src));
+      OpReplaceBlockProto proto = OpReplaceBlockProto.newBuilder()
+        .setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken))
+        .setDelHint(delHint)
+        .setSource(toProto(src))
+        .build();
+      
+      send(out, Op.REPLACE_BLOCK, proto);
     }
 
     /** Send OP_COPY_BLOCK */
     public static void opCopyBlock(DataOutputStream out, ExtendedBlock blk,
         Token<BlockTokenIdentifier> blockToken)
         throws IOException {
-      send(out, Op.COPY_BLOCK, new Op.CopyBlockHeader(blk, blockToken));
+      OpCopyBlockProto proto = OpCopyBlockProto.newBuilder()
+        .setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken))
+        .build();
+      
+      send(out, Op.COPY_BLOCK, proto);
     }
 
     /** Send OP_BLOCK_CHECKSUM */
     public static void opBlockChecksum(DataOutputStream out, ExtendedBlock blk,
         Token<BlockTokenIdentifier> blockToken)
         throws IOException {
-      send(out, Op.BLOCK_CHECKSUM, new Op.BlockChecksumHeader(blk, blockToken));
-    }
-
-    /** Write an array of {@link DatanodeInfo} */
-    private static void write(final DataOutput out,
-        final int start, 
-        final DatanodeInfo[] datanodeinfos) throws IOException {
-      out.writeInt(datanodeinfos.length - start);
-      for (int i = start; i < datanodeinfos.length; i++) {
-        datanodeinfos[i].write(out);
-      }
+      OpBlockChecksumProto proto = OpBlockChecksumProto.newBuilder()
+        .setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken))
+        .build();
+      
+      send(out, Op.BLOCK_CHECKSUM, proto);
     }
   }
 
@@ -631,12 +303,16 @@ public interface DataTransferProtocol {
 
     /** Receive OP_READ_BLOCK */
     private void opReadBlock(DataInputStream in) throws IOException {
-      final Op.ReadBlockHeader h = new Op.ReadBlockHeader();
-      h.readFields(in);
-      opReadBlock(in, h.getBlock(), h.getOffset(), h.getLength(),
-          h.getClientName(), h.getBlockToken());
-    }
+      OpReadBlockProto proto = OpReadBlockProto.parseFrom(vintPrefixed(in));
+      
+      ExtendedBlock b = fromProto(
+          proto.getHeader().getBaseHeader().getBlock());
+      Token<BlockTokenIdentifier> token = fromProto(
+          proto.getHeader().getBaseHeader().getToken());
 
+      opReadBlock(in, b, proto.getOffset(), proto.getLen(),
+          proto.getHeader().getClientName(), token);
+    }
     /**
      * Abstract OP_READ_BLOCK method. Read a block.
      */
@@ -646,12 +322,17 @@ public interface DataTransferProtocol {
     
     /** Receive OP_WRITE_BLOCK */
     private void opWriteBlock(DataInputStream in) throws IOException {
-      final Op.WriteBlockHeader h = new Op.WriteBlockHeader();
-      h.readFields(in);
-      opWriteBlock(in, h.getBlock(), h.getPipelineSize(), h.getStage(),
-          h.getLatestGenerationStamp(),
-          h.getMinBytesRcvd(), h.getMaxBytesRcvd(),
-          h.getClientName(), h.getSource(), h.getTargets(), h.getBlockToken());
+      final OpWriteBlockProto proto = OpWriteBlockProto.parseFrom(vintPrefixed(in));
+      opWriteBlock(in,
+          fromProto(proto.getHeader().getBaseHeader().getBlock()),
+          proto.getPipelineSize(),
+          fromProto(proto.getStage()),
+          proto.getLatestGenerationStamp(),
+          proto.getMinBytesRcvd(), proto.getMaxBytesRcvd(),
+          proto.getHeader().getClientName(),
+          fromProto(proto.getSource()),
+          fromProtos(proto.getTargetsList()),
+          fromProto(proto.getHeader().getBaseHeader().getToken()));
     }
 
     /**
@@ -666,10 +347,14 @@ public interface DataTransferProtocol {
 
     /** Receive {@link Op#TRANSFER_BLOCK} */
     private void opTransferBlock(DataInputStream in) throws IOException {
-      final Op.TransferBlockHeader h = new Op.TransferBlockHeader();
-      h.readFields(in);
-      opTransferBlock(in, h.getBlock(), h.getClientName(), h.getTargets(),
-          h.getBlockToken());
+      final OpTransferBlockProto proto =
+        OpTransferBlockProto.parseFrom(vintPrefixed(in));
+
+      opTransferBlock(in,
+          fromProto(proto.getHeader().getBaseHeader().getBlock()),
+          proto.getHeader().getClientName(),
+          fromProtos(proto.getTargetsList()),
+          fromProto(proto.getHeader().getBaseHeader().getToken()));
     }
 
     /**
@@ -684,10 +369,13 @@ public interface DataTransferProtocol {
 
     /** Receive OP_REPLACE_BLOCK */
     private void opReplaceBlock(DataInputStream in) throws IOException {
-      final Op.ReplaceBlockHeader h = new Op.ReplaceBlockHeader();
-      h.readFields(in);
-      opReplaceBlock(in, h.getBlock(), h.getDelHint(), h.getSource(),
-          h.getBlockToken());
+      OpReplaceBlockProto proto = OpReplaceBlockProto.parseFrom(vintPrefixed(in));
+
+      opReplaceBlock(in,
+          fromProto(proto.getHeader().getBlock()),
+          proto.getDelHint(),
+          fromProto(proto.getSource()),
+          fromProto(proto.getHeader().getToken()));
     }
 
     /**
@@ -700,9 +388,11 @@ public interface DataTransferProtocol {
 
     /** Receive OP_COPY_BLOCK */
     private void opCopyBlock(DataInputStream in) throws IOException {
-      final Op.CopyBlockHeader h = new Op.CopyBlockHeader();
-      h.readFields(in);
-      opCopyBlock(in, h.getBlock(), h.getBlockToken());
+      OpCopyBlockProto proto = OpCopyBlockProto.parseFrom(vintPrefixed(in));
+      
+      opCopyBlock(in,
+          fromProto(proto.getHeader().getBlock()),
+          fromProto(proto.getHeader().getToken()));
     }
 
     /**
@@ -715,9 +405,11 @@ public interface DataTransferProtocol {
 
     /** Receive OP_BLOCK_CHECKSUM */
     private void opBlockChecksum(DataInputStream in) throws IOException {
-      final Op.BlockChecksumHeader h = new Op.BlockChecksumHeader();
-      h.readFields(in);
-      opBlockChecksum(in, h.getBlock(), h.getBlockToken());
+      OpBlockChecksumProto proto = OpBlockChecksumProto.parseFrom(vintPrefixed(in));
+      
+      opBlockChecksum(in,
+          fromProto(proto.getHeader().getBlock()),
+          fromProto(proto.getHeader().getToken()));
     }
 
     /**
@@ -727,29 +419,13 @@ public interface DataTransferProtocol {
     protected abstract void opBlockChecksum(DataInputStream in,
         ExtendedBlock blk, Token<BlockTokenIdentifier> blockToken)
         throws IOException;
-
-    /** Read an array of {@link DatanodeInfo} */
-    private static DatanodeInfo[] readDatanodeInfos(final DataInput in
-        ) throws IOException {
-      final int n = in.readInt();
-      if (n < 0) {
-        throw new IOException("Mislabelled incoming datastream: "
-            + n + " = n < 0");
-      }
-      final DatanodeInfo[] datanodeinfos= new DatanodeInfo[n];
-      for (int i = 0; i < datanodeinfos.length; i++) {
-        datanodeinfos[i] = DatanodeInfo.read(in);
-      }
-      return datanodeinfos;
-    }
   }
   
   /** reply **/
   @InterfaceAudience.Private
   @InterfaceStability.Evolving
-  public static class PipelineAck implements Writable {
-    private long seqno;
-    private Status replies[];
+  public static class PipelineAck {
+    PipelineAckProto proto;
     public final static long UNKOWN_SEQNO = -2;
 
     /** default constructor **/
@@ -762,8 +438,10 @@ public interface DataTransferProtocol {
      * @param replies an array of replies
      */
     public PipelineAck(long seqno, Status[] replies) {
-      this.seqno = seqno;
-      this.replies = replies;
+      proto = PipelineAckProto.newBuilder()
+        .setSeqno(seqno)
+        .addAllStatus(Arrays.asList(replies))
+        .build();
     }
     
     /**
@@ -771,7 +449,7 @@ public interface DataTransferProtocol {
      * @return the sequence number
      */
     public long getSeqno() {
-      return seqno;
+      return proto.getSeqno();
     }
     
     /**
@@ -779,7 +457,7 @@ public interface DataTransferProtocol {
      * @return the number of replies
      */
     public short getNumOfReplies() {
-      return (short)replies.length;
+      return (short)proto.getStatusCount();
     }
     
     /**
@@ -787,11 +465,7 @@ public interface DataTransferProtocol {
      * @return the the ith reply
      */
     public Status getReply(int i) {
-      if (i<0 || i>=replies.length) {
-        throw new IllegalArgumentException("The input parameter " + i + 
-            " should in the range of [0, " + replies.length);
-      }
-      return replies[i];
+      return proto.getStatus(i);
     }
     
     /**
@@ -799,8 +473,8 @@ public interface DataTransferProtocol {
      * @return true if all statuses are SUCCESS
      */
     public boolean isSuccess() {
-      for (Status reply : replies) {
-        if (reply != Status.SUCCESS) {
+      for (DataTransferProtos.Status reply : proto.getStatusList()) {
+        if (reply != DataTransferProtos.Status.SUCCESS) {
           return false;
         }
       }
@@ -808,54 +482,37 @@ public interface DataTransferProtocol {
     }
     
     /**** Writable interface ****/
-    @Override // Writable
-    public void readFields(DataInput in) throws IOException {
-      seqno = in.readLong();
-      short numOfReplies = in.readShort();
-      replies = new Status[numOfReplies];
-      for (int i=0; i<numOfReplies; i++) {
-        replies[i] = Status.read(in);
-      }
+    public void readFields(InputStream in) throws IOException {
+      proto = PipelineAckProto.parseFrom(vintPrefixed(in));
     }
 
-    @Override // Writable
-    public void write(DataOutput out) throws IOException {
-      //WritableUtils.writeVLong(out, seqno);
-      out.writeLong(seqno);
-      out.writeShort((short)replies.length);
-      for(Status reply : replies) {
-        reply.write(out);
-      }
+    public void write(OutputStream out) throws IOException {
+      proto.writeDelimitedTo(out);
     }
     
     @Override //Object
     public String toString() {
-      StringBuilder ack = new StringBuilder("Replies for seqno ");
-      ack.append( seqno ).append( " are" );
-      for(Status reply : replies) {
-        ack.append(" ");
-        ack.append(reply);
-      }
-      return ack.toString();
+      return proto.toString();
     }
   }
 
   /**
    * Header data for each packet that goes through the read/write pipelines.
    */
-  public static class PacketHeader implements Writable {
+  public static class PacketHeader {
     /** Header size for a packet */
-    public static final int PKT_HEADER_LEN = ( 4 + /* Packet payload length */
-                                               8 + /* offset in block */
-                                               8 + /* seqno */
-                                               1 + /* isLastPacketInBlock */
-                                               4   /* data length */ );
+    private static final int PROTO_SIZE = 
+      PacketHeaderProto.newBuilder()
+        .setOffsetInBlock(0)
+        .setSeqno(0)
+        .setLastPacketInBlock(false)
+        .setDataLen(0)
+        .build().getSerializedSize();
+    public static final int PKT_HEADER_LEN =
+      6 + PROTO_SIZE;
 
     private int packetLen;
-    private long offsetInBlock;
-    private long seqno;
-    private boolean lastPacketInBlock;
-    private int dataLen;
+    private PacketHeaderProto proto;
 
     public PacketHeader() {
     }
@@ -863,26 +520,28 @@ public interface DataTransferProtocol {
     public PacketHeader(int packetLen, long offsetInBlock, long seqno,
                         boolean lastPacketInBlock, int dataLen) {
       this.packetLen = packetLen;
-      this.offsetInBlock = offsetInBlock;
-      this.seqno = seqno;
-      this.lastPacketInBlock = lastPacketInBlock;
-      this.dataLen = dataLen;
+      proto = PacketHeaderProto.newBuilder()
+        .setOffsetInBlock(offsetInBlock)
+        .setSeqno(seqno)
+        .setLastPacketInBlock(lastPacketInBlock)
+        .setDataLen(dataLen)
+        .build();
     }
 
     public int getDataLen() {
-      return dataLen;
+      return proto.getDataLen();
     }
 
     public boolean isLastPacketInBlock() {
-      return lastPacketInBlock;
+      return proto.getLastPacketInBlock();
     }
 
     public long getSeqno() {
-      return seqno;
+      return proto.getSeqno();
     }
 
     public long getOffsetInBlock() {
-      return offsetInBlock;
+      return proto.getOffsetInBlock();
     }
 
     public int getPacketLen() {
@@ -891,57 +550,50 @@ public interface DataTransferProtocol {
 
     @Override
     public String toString() {
-      StringBuilder sb = new StringBuilder();
-      sb.append("PacketHeader(")
-        .append("packetLen=").append(packetLen)
-        .append(" offsetInBlock=").append(offsetInBlock)
-        .append(" seqno=").append(seqno)
-        .append(" lastPacketInBlock=").append(lastPacketInBlock)
-        .append(" dataLen=").append(dataLen)
-        .append(")");
-      return sb.toString();
+      return "PacketHeader with packetLen=" + packetLen +
+        "Header data: " + 
+        proto.toString();
     }
-
-    @Override
-    public void readFields(DataInput in) throws IOException {
-      // Note that it's important for packetLen to come first and not
-      // change format -
-      // this is used by BlockReceiver to read entire packets with
-      // a single read call.
-      packetLen = in.readInt();
-      offsetInBlock = in.readLong();
-      seqno = in.readLong();
-      lastPacketInBlock = in.readBoolean();
-      dataLen = in.readInt();
-    }
-
+    
     public void readFields(ByteBuffer buf) throws IOException {
       packetLen = buf.getInt();
-      offsetInBlock = buf.getLong();
-      seqno = buf.getLong();
-      lastPacketInBlock = (buf.get() != 0);
-      dataLen = buf.getInt();
+      short protoLen = buf.getShort();
+      byte[] data = new byte[protoLen];
+      buf.get(data);
+      proto = PacketHeaderProto.parseFrom(data);
     }
-
-    @Override
-    public void write(DataOutput out) throws IOException {
-      out.writeInt(packetLen);
-      out.writeLong(offsetInBlock);
-      out.writeLong(seqno);
-      out.writeBoolean(lastPacketInBlock);
-      out.writeInt(dataLen);
+    
+    public void readFields(DataInputStream in) throws IOException {
+      this.packetLen = in.readInt();
+      short protoLen = in.readShort();
+      byte[] data = new byte[protoLen];
+      in.readFully(data);
+      proto = PacketHeaderProto.parseFrom(data);
     }
 
+
     /**
      * Write the header into the buffer.
      * This requires that PKT_HEADER_LEN bytes are available.
      */
-    public void putInBuffer(ByteBuffer buf) {
-      buf.putInt(packetLen)
-        .putLong(offsetInBlock)
-        .putLong(seqno)
-        .put((byte)(lastPacketInBlock ? 1 : 0))
-        .putInt(dataLen);
+    public void putInBuffer(final ByteBuffer buf) {
+      assert proto.getSerializedSize() == PROTO_SIZE
+        : "Expected " + (PROTO_SIZE) + " got: " + proto.getSerializedSize();
+      try {
+        buf.putInt(packetLen);
+        buf.putShort((short) proto.getSerializedSize());
+        proto.writeTo(new ByteBufferOutputStream(buf));
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+    
+    public void write(DataOutputStream out) throws IOException {
+      assert proto.getSerializedSize() == PROTO_SIZE
+      : "Expected " + (PROTO_SIZE) + " got: " + proto.getSerializedSize();
+      out.writeInt(packetLen);
+      out.writeShort(proto.getSerializedSize());
+      proto.writeTo(out);
     }
 
     /**
@@ -951,11 +603,11 @@ public interface DataTransferProtocol {
      */
     public boolean sanityCheck(long lastSeqNo) {
       // We should only have a non-positive data length for the last packet
-      if (dataLen <= 0 && lastPacketInBlock) return false;
+      if (proto.getDataLen() <= 0 && proto.getLastPacketInBlock()) return false;
       // The last packet should not contain data
-      if (lastPacketInBlock && dataLen != 0) return false;
+      if (proto.getLastPacketInBlock() && proto.getDataLen() != 0) return false;
       // Seqnos should always increase by 1 with each packet received
-      if (seqno != lastSeqNo + 1) return false;
+      if (proto.getSeqno() != lastSeqNo + 1) return false;
       return true;
     }
 
@@ -963,16 +615,12 @@ public interface DataTransferProtocol {
     public boolean equals(Object o) {
       if (!(o instanceof PacketHeader)) return false;
       PacketHeader other = (PacketHeader)o;
-      return (other.packetLen == packetLen &&
-              other.offsetInBlock == offsetInBlock &&
-              other.seqno == seqno &&
-              other.lastPacketInBlock == lastPacketInBlock &&
-              other.dataLen == dataLen);
+      return this.proto.equals(other.proto);
     }
 
     @Override
     public int hashCode() {
-      return (int)seqno;
+      return (int)proto.getSeqno();
     }
   }
 

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java?rev=1134492&r1=1134491&r2=1134492&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java Sat Jun 11 00:36:12 2011
@@ -159,6 +159,11 @@ public class DatanodeInfo extends Datano
   public void setCapacity(long capacity) { 
     this.capacity = capacity; 
   }
+  
+  /** Sets the used space for the datanode. */
+  public void setDfsUsed(long dfsUsed) {
+    this.dfsUsed = dfsUsed;
+  }
 
   /** Sets raw free space. */
   public void setRemaining(long remaining) { 

Added: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/HdfsProtoUtil.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/HdfsProtoUtil.java?rev=1134492&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/HdfsProtoUtil.java (added)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/HdfsProtoUtil.java Sat Jun 11 00:36:12 2011
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.util.ExactSizeInputStream;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.token.Token;
+
+import com.google.common.collect.Lists;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.CodedInputStream;
+
+/**
+ * Utilities for converting to and from protocol buffers used in the
+ * HDFS wire protocol, as well as some generic utilities useful
+ * for dealing with protocol buffers.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public abstract class HdfsProtoUtil {
+  
+  //// Block Token ////
+  
+  public static HdfsProtos.BlockTokenIdentifierProto toProto(Token<?> blockToken) {
+    return HdfsProtos.BlockTokenIdentifierProto.newBuilder()
+      .setIdentifier(ByteString.copyFrom(blockToken.getIdentifier()))
+      .setPassword(ByteString.copyFrom(blockToken.getPassword()))
+      .setKind(blockToken.getKind().toString())
+      .setService(blockToken.getService().toString())
+      .build();
+  }
+
+  public static Token<BlockTokenIdentifier> fromProto(HdfsProtos.BlockTokenIdentifierProto proto) {
+    return new Token<BlockTokenIdentifier>(proto.getIdentifier().toByteArray(),
+        proto.getPassword().toByteArray(),
+        new Text(proto.getKind()),
+        new Text(proto.getService()));
+  }
+
+  //// Extended Block ////
+  
+  public static HdfsProtos.ExtendedBlockProto toProto(ExtendedBlock block) {
+    return HdfsProtos.ExtendedBlockProto.newBuilder()
+      .setBlockId(block.getBlockId())
+      .setPoolId(block.getBlockPoolId())
+      .setNumBytes(block.getNumBytes())
+      .setGenerationStamp(block.getGenerationStamp())
+      .build();
+  }
+    
+  public static ExtendedBlock fromProto(HdfsProtos.ExtendedBlockProto proto) {
+    return new ExtendedBlock(
+        proto.getPoolId(), proto.getBlockId(),
+        proto.getNumBytes(), proto.getGenerationStamp());
+  }
+
+  //// DatanodeID ////
+  
+  private static HdfsProtos.DatanodeIDProto toProto(
+      DatanodeID dni) {
+    return HdfsProtos.DatanodeIDProto.newBuilder()
+      .setName(dni.getName())
+      .setStorageID(dni.getStorageID())
+      .setInfoPort(dni.getInfoPort())
+      .build();
+  }
+  
+  private static DatanodeID fromProto(HdfsProtos.DatanodeIDProto idProto) {
+    return new DatanodeID(
+        idProto.getName(),
+        idProto.getStorageID(),
+        idProto.getInfoPort(),
+        -1); // ipc port not serialized in writables either
+  }
+  
+  //// DatanodeInfo ////
+  
+  public static HdfsProtos.DatanodeInfoProto toProto(DatanodeInfo dni) {
+    return HdfsProtos.DatanodeInfoProto.newBuilder()
+      .setId(toProto((DatanodeID)dni))
+      .setCapacity(dni.getCapacity())
+      .setDfsUsed(dni.getDfsUsed())
+      .setRemaining(dni.getRemaining())
+      .setBlockPoolUsed(dni.getBlockPoolUsed())
+      .setLastUpdate(dni.getLastUpdate())
+      .setXceiverCount(dni.getXceiverCount())
+      .setLocation(dni.getNetworkLocation())
+      .setHostName(dni.getHostName())
+      .setAdminState(HdfsProtos.DatanodeInfoProto.AdminState.valueOf(
+          dni.getAdminState().name()))
+      .build();
+  }
+
+  public static DatanodeInfo fromProto(HdfsProtos.DatanodeInfoProto dniProto) {
+    DatanodeInfo dniObj = new DatanodeInfo(fromProto(dniProto.getId()),
+        dniProto.getLocation(), dniProto.getHostName());
+
+    dniObj.setCapacity(dniProto.getCapacity());
+    dniObj.setDfsUsed(dniProto.getDfsUsed());
+    dniObj.setRemaining(dniProto.getRemaining());
+    dniObj.setBlockPoolUsed(dniProto.getBlockPoolUsed());
+    dniObj.setLastUpdate(dniProto.getLastUpdate());
+    dniObj.setXceiverCount(dniProto.getXceiverCount());
+    dniObj.setAdminState(DatanodeInfo.AdminStates.valueOf(
+        dniProto.getAdminState().name()));
+    return dniObj;
+  }
+  
+  public static ArrayList<? extends HdfsProtos.DatanodeInfoProto> toProtos(
+      DatanodeInfo[] dnInfos, int startIdx) {
+    ArrayList<HdfsProtos.DatanodeInfoProto> protos =
+      Lists.newArrayListWithCapacity(dnInfos.length);
+    for (int i = startIdx; i < dnInfos.length; i++) {
+      protos.add(toProto(dnInfos[i]));
+    }
+    return protos;
+  }
+  
+  public static DatanodeInfo[] fromProtos(
+      List<HdfsProtos.DatanodeInfoProto> targetsList) {
+    DatanodeInfo[] ret = new DatanodeInfo[targetsList.size()];
+    int i = 0;
+    for (HdfsProtos.DatanodeInfoProto proto : targetsList) {
+      ret[i++] = fromProto(proto);
+    }
+    return ret;
+  }
+
+  public static InputStream vintPrefixed(final InputStream input)
+  throws IOException {
+    final int firstByte = input.read();
+    if (firstByte == -1) {
+      throw new EOFException("Premature EOF: no length prefix available");
+    }
+    
+    int size = CodedInputStream.readRawVarint32(firstByte, input);
+    assert size >= 0;
+  
+    return new ExactSizeInputStream(input, size);
+  }
+}
\ No newline at end of file