You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2011/06/11 02:36:13 UTC

svn commit: r1134492 [4/4] - in /hadoop/hdfs/trunk: ./ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/ src/java/org/apache/hadoop/hdfs/protocol/proto/ src/java/org/apache/hadoop/hdfs/server/balancer/ src/java/org/apache/hadoo...

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=1134492&r1=1134491&r2=1134492&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java Sat Jun 11 00:36:12 2011
@@ -17,8 +17,6 @@
  */
 package org.apache.hadoop.hdfs.server.balancer;
 
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.ERROR_ACCESS_TOKEN;
-
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
 import java.io.DataInputStream;
@@ -58,6 +56,9 @@ import org.apache.hadoop.hdfs.protocol.D
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.Util;
@@ -73,6 +74,8 @@ import org.apache.hadoop.util.StringUtil
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 
+import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.vintPrefixed;
+
 /** <p>The balancer is a tool that balances disk space usage on an HDFS cluster
  * when some datanodes become full or when new empty nodes join the cluster.
  * The tool is deployed as an application program that can be run by the 
@@ -352,9 +355,10 @@ public class Balancer {
     
     /* Receive a block copy response from the input stream */ 
     private void receiveResponse(DataInputStream in) throws IOException {
-      DataTransferProtocol.Status status = DataTransferProtocol.Status.read(in);
-      if (status != DataTransferProtocol.Status.SUCCESS) {
-        if (status == ERROR_ACCESS_TOKEN)
+      BlockOpResponseProto response = BlockOpResponseProto.parseFrom(
+          vintPrefixed(in));
+      if (response.getStatus() != Status.SUCCESS) {
+        if (response.getStatus() == Status.ERROR_ACCESS_TOKEN)
           throw new IOException("block move failed due to access token error");
         throw new IOException("block move is failed");
       }

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1134492&r1=1134491&r2=1134492&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Sat Jun 11 00:36:12 2011
@@ -17,8 +17,6 @@
  */
 package org.apache.hadoop.hdfs.server.datanode;
 
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.ERROR;
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.SUCCESS;
 import static org.apache.hadoop.hdfs.server.datanode.DataNode.DN_CLIENTTRACE_FORMAT;
 
 import java.io.BufferedOutputStream;
@@ -39,11 +37,11 @@ import org.apache.hadoop.fs.FSOutputSumm
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PacketHeader;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PipelineAck;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.util.DataTransferThrottler;
@@ -996,13 +994,13 @@ class BlockReceiver implements Closeable
             Status[] replies = null;
             if (mirrorError) { // ack read error
               replies = new Status[2];
-              replies[0] = SUCCESS;
-              replies[1] = ERROR;
+              replies[0] = Status.SUCCESS;
+              replies[1] = Status.ERROR;
             } else {
               short ackLen = type == PacketResponderType.LAST_IN_PIPELINE? 0
                   : ack.getNumOfReplies();
               replies = new Status[1+ackLen];
-              replies[0] = SUCCESS;
+              replies[0] = Status.SUCCESS;
               for (int i=0; i<ackLen; i++) {
                 replies[i+1] = ack.getReply(i);
               }

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1134492&r1=1134491&r2=1134492&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Sat Jun 11 00:36:12 2011
@@ -19,8 +19,6 @@ package org.apache.hadoop.hdfs.server.da
 
 
 import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.ERROR_ACCESS_TOKEN;
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.SUCCESS;
 import static org.apache.hadoop.hdfs.server.common.Util.now;
 
 import java.io.BufferedOutputStream;
@@ -75,8 +73,11 @@ import org.apache.hadoop.hdfs.protocol.D
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
 import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
 import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DNTransferAckProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
@@ -133,6 +134,7 @@ import org.apache.hadoop.util.StringUtil
 import org.apache.hadoop.util.VersionInfo;
 import org.mortbay.util.ajax.JSON;
 
+
 /**********************************************************
  * DataNode is a class (and program) that stores a set of
  * blocks for a DFS deployment.  A single deployment can
@@ -1958,12 +1960,13 @@ public class DataNode extends Configured
         // read ack
         if (isClient) {
           in = new DataInputStream(NetUtils.getInputStream(sock));
-          final DataTransferProtocol.Status s = DataTransferProtocol.Status.read(in);
+          DNTransferAckProto closeAck = DNTransferAckProto.parseFrom(
+              HdfsProtoUtil.vintPrefixed(in));
           if (LOG.isDebugEnabled()) {
-            LOG.debug(getClass().getSimpleName() + ": close-ack=" + s);
+            LOG.debug(getClass().getSimpleName() + ": close-ack=" + closeAck);
           }
-          if (s != SUCCESS) {
-            if (s == ERROR_ACCESS_TOKEN) {
+          if (closeAck.getStatus() != Status.SUCCESS) {
+            if (closeAck.getStatus() == Status.ERROR_ACCESS_TOKEN) {
               throw new InvalidBlockTokenException(
                   "Got access token error for connect ack, targets="
                    + Arrays.asList(targets));

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1134492&r1=1134491&r2=1134492&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Sat Jun 11 00:36:12 2011
@@ -17,9 +17,8 @@
  */
 package org.apache.hadoop.hdfs.server.datanode;
 
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.ERROR;
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.ERROR_ACCESS_TOKEN;
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.SUCCESS;
+import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.*;
+import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import static org.apache.hadoop.hdfs.server.common.Util.now;
 import static org.apache.hadoop.hdfs.server.datanode.DataNode.DN_CLIENTTRACE_FORMAT;
 
@@ -44,6 +43,11 @@ import org.apache.hadoop.hdfs.protocol.D
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto;
+
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
@@ -51,7 +55,6 @@ import org.apache.hadoop.hdfs.server.dat
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.MD5Hash;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.metrics2.lib.MutableCounterLong;
 import org.apache.hadoop.metrics2.lib.MutableRate;
 import org.apache.hadoop.net.NetUtils;
@@ -60,6 +63,9 @@ import org.apache.hadoop.security.token.
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.StringUtils;
 
+import com.google.protobuf.ByteString;
+
+
 /**
  * Thread for processing incoming/outgoing data stream.
  */
@@ -224,16 +230,19 @@ class DataXceiver extends DataTransferPr
         sendResponse(s, ERROR, datanode.socketWriteTimeout);
         throw e;
       }
+      
+      // send op status
+      sendResponse(s, SUCCESS, datanode.socketWriteTimeout);
 
-      SUCCESS.write(out); // send op status
       long read = blockSender.sendBlock(out, baseStream, null); // send data
 
       if (blockSender.didSendEntireByteRange()) {
         // If we sent the entire range, then we should expect the client
         // to respond with a Status enum.
         try {
-          DataTransferProtocol.Status stat = DataTransferProtocol.Status.read(in);
-          if (stat == null) {
+          ClientReadStatusProto stat = ClientReadStatusProto.parseFrom(
+              HdfsProtoUtil.vintPrefixed(in));
+          if (!stat.hasStatus()) {
             LOG.warn("Client " + s.getInetAddress() + " did not send a valid status " +
                      "code after reading. Will close connection.");
             IOUtils.closeStream(out);
@@ -329,7 +338,7 @@ class DataXceiver extends DataTransferPr
     BlockReceiver blockReceiver = null; // responsible for data handling
     String mirrorNode = null;           // the name:port of next target
     String firstBadLink = "";           // first datanode that failed in connection setup
-    DataTransferProtocol.Status mirrorInStatus = SUCCESS;
+    Status mirrorInStatus = SUCCESS;
     try {
       if (isDatanode || 
           stage != BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
@@ -377,8 +386,10 @@ class DataXceiver extends DataTransferPr
 
           // read connect ack (only for clients, not for replication req)
           if (isClient) {
-            mirrorInStatus = DataTransferProtocol.Status.read(mirrorIn);
-            firstBadLink = Text.readString(mirrorIn);
+            BlockOpResponseProto connectAck =
+              BlockOpResponseProto.parseFrom(HdfsProtoUtil.vintPrefixed(mirrorIn));
+            mirrorInStatus = connectAck.getStatus();
+            firstBadLink = connectAck.getFirstBadLink();
             if (LOG.isDebugEnabled() || mirrorInStatus != SUCCESS) {
               LOG.info("Datanode " + targets.length +
                        " got response for connect ack " +
@@ -389,8 +400,11 @@ class DataXceiver extends DataTransferPr
 
         } catch (IOException e) {
           if (isClient) {
-            ERROR.write(replyOut);
-            Text.writeString(replyOut, mirrorNode);
+            BlockOpResponseProto.newBuilder()
+              .setStatus(ERROR)
+              .setFirstBadLink(mirrorNode)
+              .build()
+              .writeDelimitedTo(replyOut);
             replyOut.flush();
           }
           IOUtils.closeStream(mirrorOut);
@@ -417,8 +431,11 @@ class DataXceiver extends DataTransferPr
                    " forwarding connect ack to upstream firstbadlink is " +
                    firstBadLink);
         }
-        mirrorInStatus.write(replyOut);
-        Text.writeString(replyOut, firstBadLink);
+        BlockOpResponseProto.newBuilder()
+          .setStatus(mirrorInStatus)
+          .setFirstBadLink(firstBadLink)
+          .build()
+          .writeDelimitedTo(replyOut);
         replyOut.flush();
       }
 
@@ -433,7 +450,7 @@ class DataXceiver extends DataTransferPr
           if (LOG.isTraceEnabled()) {
             LOG.trace("TRANSFER: send close-ack");
           }
-          SUCCESS.write(replyOut);
+          writeResponse(SUCCESS, replyOut);
         }
       }
 
@@ -489,12 +506,12 @@ class DataXceiver extends DataTransferPr
         NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
     try {
       datanode.transferReplicaForPipelineRecovery(blk, targets, client);
-      SUCCESS.write(out);
+      writeResponse(Status.SUCCESS, out);
     } finally {
       IOUtils.closeStream(out);
     }
   }
-
+  
   /**
    * Get block checksum (MD5 of CRC32).
    */
@@ -530,10 +547,15 @@ class DataXceiver extends DataTransferPr
       }
 
       //write reply
-      SUCCESS.write(out);
-      out.writeInt(bytesPerCRC);
-      out.writeLong(crcPerBlock);
-      md5.write(out);
+      BlockOpResponseProto.newBuilder()
+        .setStatus(SUCCESS)
+        .setChecksumResponse(OpBlockChecksumResponseProto.newBuilder()             
+          .setBytesPerCrc(bytesPerCRC)
+          .setCrcPerBlock(crcPerBlock)
+          .setMd5(ByteString.copyFrom(md5.getDigest()))
+          )
+        .build()
+        .writeDelimitedTo(out);
       out.flush();
     } finally {
       IOUtils.closeStream(out);
@@ -590,7 +612,7 @@ class DataXceiver extends DataTransferPr
           baseStream, SMALL_BUFFER_SIZE));
 
       // send status first
-      SUCCESS.write(reply);
+      writeResponse(SUCCESS, reply);
       // send block content to the target
       long read = blockSender.sendBlock(reply, baseStream, 
                                         dataXceiverServer.balanceThrottler);
@@ -653,7 +675,7 @@ class DataXceiver extends DataTransferPr
 
     Socket proxySock = null;
     DataOutputStream proxyOut = null;
-    DataTransferProtocol.Status opStatus = SUCCESS;
+    Status opStatus = SUCCESS;
     BlockReceiver blockReceiver = null;
     DataInputStream proxyReply = null;
     
@@ -676,10 +698,11 @@ class DataXceiver extends DataTransferPr
       // receive the response from the proxy
       proxyReply = new DataInputStream(new BufferedInputStream(
           NetUtils.getInputStream(proxySock), BUFFER_SIZE));
-      final DataTransferProtocol.Status status
-          = DataTransferProtocol.Status.read(proxyReply);
-      if (status != SUCCESS) {
-        if (status == ERROR_ACCESS_TOKEN) {
+      BlockOpResponseProto copyResponse = BlockOpResponseProto.parseFrom(
+          HdfsProtoUtil.vintPrefixed(proxyReply));
+
+      if (copyResponse.getStatus() != SUCCESS) {
+        if (copyResponse.getStatus() == ERROR_ACCESS_TOKEN) {
           throw new IOException("Copy block " + block + " from "
               + proxySock.getRemoteSocketAddress()
               + " failed due to access token error");
@@ -748,13 +771,24 @@ class DataXceiver extends DataTransferPr
    * @param opStatus status message to write
    * @param timeout send timeout
    **/
-  private void sendResponse(Socket s, DataTransferProtocol.Status opStatus,
+  private void sendResponse(Socket s, Status status,
       long timeout) throws IOException {
     DataOutputStream reply = 
       new DataOutputStream(NetUtils.getOutputStream(s, timeout));
-    opStatus.write(reply);
-    reply.flush();
+    
+    writeResponse(status, reply);
+  }
+  
+  private void writeResponse(Status status, OutputStream out)
+  throws IOException {
+    BlockOpResponseProto response = BlockOpResponseProto.newBuilder()
+      .setStatus(status)
+      .build();
+    
+    response.writeDelimitedTo(out);
+    out.flush();
   }
+  
 
   private void checkAccess(DataOutputStream out, final boolean reply, 
       final ExtendedBlock blk,
@@ -771,12 +805,15 @@ class DataXceiver extends DataTransferPr
               out = new DataOutputStream(
                   NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
             }
-            ERROR_ACCESS_TOKEN.write(out);
+            
+            BlockOpResponseProto.Builder resp = BlockOpResponseProto.newBuilder()
+              .setStatus(ERROR_ACCESS_TOKEN);
             if (mode == BlockTokenSecretManager.AccessMode.WRITE) {
               DatanodeRegistration dnR = 
                 datanode.getDNRegistrationForBP(blk.getBlockPoolId());
-              Text.writeString(out, dnR.getName());
+              resp.setFirstBadLink(dnR.getName());
             }
+            resp.build().writeDelimitedTo(out);
             out.flush();
           }
           LOG.warn("Block token verification failed: op=" + op

Added: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/util/ByteBufferOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/util/ByteBufferOutputStream.java?rev=1134492&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/util/ByteBufferOutputStream.java (added)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/util/ByteBufferOutputStream.java Sat Jun 11 00:36:12 2011
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.util;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * OutputStream that writes into a {@link ByteBuffer}.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Stable
+public class ByteBufferOutputStream extends OutputStream {
+
+  private final ByteBuffer buf;
+
+  public ByteBufferOutputStream(ByteBuffer buf) {
+    this.buf = buf;
+  }
+
+  @Override
+  public void write(int b) throws IOException {
+    buf.put((byte)b);
+  }
+
+  @Override
+  public void write(byte[] b, int off, int len) throws IOException {
+    buf.put(b, off, len);
+  }
+}
\ No newline at end of file

Added: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/util/ExactSizeInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/util/ExactSizeInputStream.java?rev=1134492&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/util/ExactSizeInputStream.java (added)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/util/ExactSizeInputStream.java Sat Jun 11 00:36:12 2011
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.util;
+
+import java.io.EOFException;
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * An InputStream implementations which reads from some other InputStream
+ * but expects an exact number of bytes. Any attempts to read past the
+ * specified number of bytes will return as if the end of the stream
+ * was reached. If the end of the underlying stream is reached prior to
+ * the specified number of bytes, an EOFException is thrown.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class ExactSizeInputStream extends FilterInputStream {
+  private int remaining;
+
+  /**
+   * Construct an input stream that will read no more than
+   * 'numBytes' bytes.
+   * 
+   * If an EOF occurs on the underlying stream before numBytes
+   * bytes have been read, an EOFException will be thrown.
+   * 
+   * @param in the inputstream to wrap
+   * @param numBytes the number of bytes to read
+   */
+  public ExactSizeInputStream(InputStream in, int numBytes) {
+    super(in);
+    Preconditions.checkArgument(numBytes >= 0,
+        "Negative expected bytes: ", numBytes);
+    this.remaining = numBytes;
+  }
+
+  @Override
+  public int available() throws IOException {
+    return Math.min(super.available(), remaining);
+  }
+
+  @Override
+  public int read() throws IOException {
+    // EOF if we reached our limit
+    if (remaining <= 0) {
+      return -1;
+    }
+    final int result = super.read();
+    if (result >= 0) {
+      --remaining;
+    } else if (remaining > 0) {
+      // Underlying stream reached EOF but we haven't read the expected
+      // number of bytes.
+      throw new EOFException(
+          "Premature EOF. Expected " + remaining + "more bytes");
+    }
+    return result;
+  }
+
+  @Override
+  public int read(final byte[] b, final int off, int len)
+                  throws IOException {
+    if (remaining <= 0) {
+      return -1;
+    }
+    len = Math.min(len, remaining);
+    final int result = super.read(b, off, len);
+    if (result >= 0) {
+      remaining -= result;
+    } else if (remaining > 0) {
+      // Underlying stream reached EOF but we haven't read the expected
+      // number of bytes.
+      throw new EOFException(
+          "Premature EOF. Expected " + remaining + "more bytes");
+    }
+    return result;
+  }
+
+  @Override
+  public long skip(final long n) throws IOException {
+    final long result = super.skip(Math.min(n, remaining));
+    if (result > 0) {
+      remaining -= result;
+    } else if (remaining > 0) {
+      // Underlying stream reached EOF but we haven't read the expected
+      // number of bytes.
+      throw new EOFException(
+          "Premature EOF. Expected " + remaining + "more bytes");
+    }
+    return result;
+  }
+  
+  @Override
+  public boolean markSupported() {
+    return false;
+  }
+
+  @Override
+  public void mark(int readlimit) {
+    throw new UnsupportedOperationException();
+  }
+  
+}
\ No newline at end of file

Added: hadoop/hdfs/trunk/src/proto/datatransfer.proto
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/proto/datatransfer.proto?rev=1134492&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/proto/datatransfer.proto (added)
+++ hadoop/hdfs/trunk/src/proto/datatransfer.proto Sat Jun 11 00:36:12 2011
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// This file contains protocol buffers that are used to transfer data
+// to and from the datanode, as well as between datanodes.
+
+option java_package = "org.apache.hadoop.hdfs.protocol.proto";
+option java_outer_classname = "DataTransferProtos";
+option java_generate_equals_and_hash = true;
+
+import "hdfs.proto";
+
+message BaseHeaderProto {
+  required ExtendedBlockProto block = 1;
+  optional BlockTokenIdentifierProto token = 2;
+}
+
+message ClientOperationHeaderProto {
+  required BaseHeaderProto baseHeader = 1;
+  required string clientName = 2;
+}
+
+message OpReadBlockProto {
+  required ClientOperationHeaderProto header = 1;
+  required uint64 offset = 2;
+  required uint64 len = 3;
+}
+  
+message OpWriteBlockProto {
+  required ClientOperationHeaderProto header = 1;
+  repeated DatanodeInfoProto targets = 2;
+  optional DatanodeInfoProto source = 3;
+  enum BlockConstructionStage {
+    PIPELINE_SETUP_APPEND = 0;
+    // pipeline set up for failed PIPELINE_SETUP_APPEND recovery
+    PIPELINE_SETUP_APPEND_RECOVERY = 1;
+    // data streaming
+    DATA_STREAMING = 2;
+    // pipeline setup for failed data streaming recovery
+    PIPELINE_SETUP_STREAMING_RECOVERY = 3;
+    // close the block and pipeline
+    PIPELINE_CLOSE = 4;
+    // Recover a failed PIPELINE_CLOSE
+    PIPELINE_CLOSE_RECOVERY = 5;
+    // pipeline set up for block creation
+    PIPELINE_SETUP_CREATE = 6;
+    // transfer RBW for adding datanodes
+    TRANSFER_RBW = 7;
+    // transfer Finalized for adding datanodes
+    TRANSFER_FINALIZED = 8;
+  }
+  required BlockConstructionStage stage = 4;
+  required uint32 pipelineSize = 5;
+  required uint64 minBytesRcvd = 6;
+  required uint64 maxBytesRcvd = 7;
+  required uint64 latestGenerationStamp = 8;
+}
+  
+message OpTransferBlockProto {
+  required ClientOperationHeaderProto header = 1;
+  repeated DatanodeInfoProto targets = 2;
+}
+
+message OpReplaceBlockProto {
+  required BaseHeaderProto header = 1;
+  required string delHint = 2;
+  required DatanodeInfoProto source = 3;
+}
+
+message OpCopyBlockProto {
+  required BaseHeaderProto header = 1;
+}
+
+message OpBlockChecksumProto { 
+  required BaseHeaderProto header = 1;
+}
+
+
+message PacketHeaderProto {
+  // All fields must be fixed-length!
+  required sfixed64 offsetInBlock = 1;
+  required sfixed64 seqno = 2;
+  required bool lastPacketInBlock = 3;
+  required sfixed32 dataLen = 4;
+}
+
+enum Status {
+  SUCCESS = 0;
+  ERROR = 1;
+  ERROR_CHECKSUM = 2;
+  ERROR_INVALID = 3;
+  ERROR_EXISTS = 4;
+  ERROR_ACCESS_TOKEN = 5;
+  CHECKSUM_OK = 6;
+}
+
+message PipelineAckProto {
+  required sint64 seqno = 1;
+  repeated Status status = 2;
+}
+
+message BlockOpResponseProto {
+  required Status status = 1;
+
+  optional string firstBadLink = 2;
+  optional OpBlockChecksumResponseProto checksumResponse = 3;
+}
+
+/**
+ * Message sent from the client to the DN after reading the entire
+ * read request.
+ */
+message ClientReadStatusProto {
+  required Status status = 1;
+}
+
+message DNTransferAckProto {
+  required Status status = 1;
+}
+
+message OpBlockChecksumResponseProto {
+  required uint32 bytesPerCrc = 1;
+  required uint64 crcPerBlock = 2;
+  required bytes md5 = 3;
+}

Added: hadoop/hdfs/trunk/src/proto/hdfs.proto
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/proto/hdfs.proto?rev=1134492&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/proto/hdfs.proto (added)
+++ hadoop/hdfs/trunk/src/proto/hdfs.proto Sat Jun 11 00:36:12 2011
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// This file contains protocol buffers that are used throughout HDFS -- i.e.
+// by the client, server, and data transfer protocols.
+
+option java_package = "org.apache.hadoop.hdfs.protocol.proto";
+option java_outer_classname = "HdfsProtos";
+option java_generate_equals_and_hash = true;
+
+message ExtendedBlockProto {
+  required string poolId = 1;
+  required uint64 blockId = 2;
+  required uint64 numBytes = 3;
+  required uint64 generationStamp = 4;
+}
+
+message BlockTokenIdentifierProto {
+  required bytes identifier = 1;
+  required bytes password = 2;
+  required string kind = 3;
+  required string service = 4;
+}
+
+message DatanodeIDProto {
+  required string name = 1;
+  required string storageID = 2;
+  required uint32 infoPort = 3;
+}
+
+message DatanodeInfoProto {
+  required DatanodeIDProto id = 1;
+  optional uint64 capacity = 2;
+  optional uint64 dfsUsed = 3;
+  optional uint64 remaining = 4;
+  optional uint64 blockPoolUsed = 5;
+  optional uint64 lastUpdate = 6;
+  optional uint32 xceiverCount = 7;
+  optional string location = 8;
+  optional string hostName = 9;
+  enum AdminState {
+    NORMAL = 0;
+    DECOMMISSION_INPROGRESS = 1;
+    DECOMMISSIONED = 2;
+  }
+
+  optional AdminState adminState = 10;
+}
+

Modified: hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj?rev=1134492&r1=1134491&r2=1134492&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj (original)
+++ hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj Sat Jun 11 00:36:12 2011
@@ -17,9 +17,8 @@
  */
 package org.apache.hadoop.hdfs.server.datanode;
 
-import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.OutputStream;
 
 import org.apache.commons.logging.Log;
@@ -33,7 +32,6 @@ import org.apache.hadoop.hdfs.server.dat
 import org.apache.hadoop.hdfs.PipelinesTestUtil.PipelinesTest;
 import org.apache.hadoop.hdfs.PipelinesTestUtil.NodeBytes;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PipelineAck;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
@@ -83,7 +81,7 @@ privileged public aspect BlockReceiverAs
   }
 
   pointcut afterDownstreamStatusRead(BlockReceiver.PacketResponder responder):
-    call(void PipelineAck.readFields(DataInput)) && this(responder);
+    call(void PipelineAck.readFields(InputStream)) && this(responder);
 
   after(BlockReceiver.PacketResponder responder)
       throws IOException: afterDownstreamStatusRead(responder) {
@@ -150,7 +148,7 @@ privileged public aspect BlockReceiverAs
   }
   
   pointcut preventAckSending () :
-    call (void PipelineAck.write(DataOutput)) 
+    call (void PipelineAck.write(OutputStream)) 
     && within (PacketResponder);
 
   static int ackCounter = 0;
@@ -203,7 +201,7 @@ privileged public aspect BlockReceiverAs
   }
 
   pointcut pipelineAck(BlockReceiver.PacketResponder packetresponder) :
-    call (void PipelineAck.readFields(DataInput))
+    call (void PipelineAck.readFields(InputStream))
       && this(packetresponder);
 
   after(BlockReceiver.PacketResponder packetresponder) throws IOException

Modified: hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/DataTransferProtocolAspects.aj
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/DataTransferProtocolAspects.aj?rev=1134492&r1=1134491&r2=1134492&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/DataTransferProtocolAspects.aj (original)
+++ hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/DataTransferProtocolAspects.aj Sat Jun 11 00:36:12 2011
@@ -19,15 +19,16 @@ package org.apache.hadoop.hdfs.server.da
 
 import java.io.DataInput;
 import java.io.DataInputStream;
+import java.io.InputStream;
 import java.io.IOException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fi.DataTransferTestUtil;
 import org.apache.hadoop.fi.DataTransferTestUtil.DataTransferTest;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Op;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Receiver;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status;
 
 /** Aspect for DataTransferProtocol */
 public aspect DataTransferProtocolAspects {
@@ -53,9 +54,9 @@ public aspect DataTransferProtocolAspect
   }
 
   pointcut statusRead(DataXceiver dataxceiver):
-    call(Status Status.read(DataInput)) && this(dataxceiver);
+    call(BlockOpResponseProto BlockOpResponseProto.parseFrom(InputStream)) && this(dataxceiver);
 
-  after(DataXceiver dataxceiver) returning(Status status
+  after(DataXceiver dataxceiver) returning(BlockOpResponseProto status
       ) throws IOException: statusRead(dataxceiver) {
     final DataNode d = dataxceiver.getDataNode();
     LOG.info("FI: statusRead " + status + ", datanode="

Modified: hadoop/hdfs/trunk/src/test/findbugsExcludeFile.xml
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/findbugsExcludeFile.xml?rev=1134492&r1=1134491&r2=1134492&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/findbugsExcludeFile.xml (original)
+++ hadoop/hdfs/trunk/src/test/findbugsExcludeFile.xml Sat Jun 11 00:36:12 2011
@@ -3,6 +3,9 @@
        <Package name="org.apache.hadoop.record.compiler.generated" />
      </Match>
      <Match>
+       <Package name="org.apache.hadoop.hdfs.protocol.proto" />
+     </Match>
+     <Match>
        <Bug pattern="EI_EXPOSE_REP" />
      </Match>
      <Match>

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java?rev=1134492&r1=1134491&r2=1134492&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java Sat Jun 11 00:36:12 2011
@@ -57,6 +57,7 @@ import org.apache.hadoop.hdfs.protocol.D
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
@@ -66,7 +67,6 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
 import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.ShellBasedUnixGroupsMapping;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -662,7 +662,7 @@ public class DFSTestUtil {
   }
 
   /** For {@link TestTransferRbw} */
-  public static DataTransferProtocol.Status transferRbw(final ExtendedBlock b, 
+  public static BlockOpResponseProto transferRbw(final ExtendedBlock b, 
       final DFSClient dfsClient, final DatanodeInfo... datanodes) throws IOException {
     assertEquals(2, datanodes.length);
     final Socket s = DFSOutputStream.createSocketForPipeline(datanodes[0],
@@ -678,6 +678,6 @@ public class DFSTestUtil {
         new DatanodeInfo[]{datanodes[1]}, new Token<BlockTokenIdentifier>());
     out.flush();
 
-    return DataTransferProtocol.Status.read(in);
+    return BlockOpResponseProto.parseDelimitedFrom(in);
   }
 }

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java?rev=1134492&r1=1134491&r2=1134492&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java Sat Jun 11 00:36:12 2011
@@ -20,8 +20,8 @@ package org.apache.hadoop.hdfs;
 
 import java.util.List;
 
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.fs.Path;
 
 import org.junit.Test;

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java?rev=1134492&r1=1134491&r2=1134492&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java Sat Jun 11 00:36:12 2011
@@ -20,9 +20,6 @@ package org.apache.hadoop.hdfs;
 import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Op.WRITE_BLOCK;
 import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PacketHeader;
 import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PipelineAck;
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status;
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.ERROR;
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.SUCCESS;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -46,11 +43,15 @@ import org.apache.hadoop.fs.FSDataOutput
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
 import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto.Builder;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
@@ -59,6 +60,7 @@ import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.util.DataChecksum;
+import org.apache.hadoop.util.StringUtils;
 import org.junit.Test;
 
 /**
@@ -93,6 +95,9 @@ public class TestDataTransferProtocol ex
       if ( testDescription != null ) {
         LOG.info("Testing : " + testDescription);
       }
+      LOG.info("Going to write:" +
+          StringUtils.byteToHexString(sendBuf.toByteArray()));
+      
       sock = new Socket();
       sock.connect(dnAddr, HdfsConstants.READ_TIMEOUT);
       sock.setSoTimeout(HdfsConstants.READ_TIMEOUT);
@@ -113,10 +118,11 @@ public class TestDataTransferProtocol ex
         }
         throw eof;
       }
-      for (int i=0; i<retBuf.length; i++) {
-        System.out.print(retBuf[i]);
-      }
-      System.out.println(":");
+
+      LOG.info("Received: " +
+          StringUtils.byteToHexString(retBuf));
+      LOG.info("Expected: " +
+          StringUtils.byteToHexString(recvBuf.toByteArray()));
       
       if (eofExpected) {
         throw new IOException("Did not recieve IOException when an exception " +
@@ -162,12 +168,22 @@ public class TestDataTransferProtocol ex
     sendOut.writeInt(0);           // zero checksum
 
     //ok finally write a block with 0 len
-    SUCCESS.write(recvOut);
-    Text.writeString(recvOut, "");
-    new PipelineAck(100, new Status[]{SUCCESS}).write(recvOut);
+    sendResponse(Status.SUCCESS, "", recvOut);
+    new PipelineAck(100, new Status[]{Status.SUCCESS}).write(recvOut);
     sendRecvData(description, false);
   }
   
+  private void sendResponse(Status status, String firstBadLink,
+      DataOutputStream out)
+  throws IOException {
+    Builder builder = BlockOpResponseProto.newBuilder().setStatus(status);
+    if (firstBadLink != null) {
+      builder.setFirstBadLink(firstBadLink);
+    }
+    builder.build()
+      .writeDelimitedTo(out);
+  }
+
   private void testWrite(ExtendedBlock block, BlockConstructionStage stage, long newGS,
       String description, Boolean eofExcepted) throws IOException {
     sendBuf.reset();
@@ -176,12 +192,11 @@ public class TestDataTransferProtocol ex
         stage, newGS, block.getNumBytes(), block.getNumBytes(), "cl", null,
         new DatanodeInfo[1], BlockTokenSecretManager.DUMMY_TOKEN);
     if (eofExcepted) {
-      ERROR.write(recvOut);
+      sendResponse(Status.ERROR, null, recvOut);
       sendRecvData(description, true);
     } else if (stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
       //ok finally write a block with 0 len
-      SUCCESS.write(recvOut);
-      Text.writeString(recvOut, ""); // first bad node
+      sendResponse(Status.SUCCESS, "", recvOut);
       sendRecvData(description, false);
     } else {
       writeZeroLengthPacket(block, description);
@@ -369,7 +384,7 @@ public class TestDataTransferProtocol ex
     // bad bytes per checksum
     sendOut.writeInt(-1-random.nextInt(oneMil));
     recvBuf.reset();
-    ERROR.write(recvOut);
+    sendResponse(Status.ERROR, null, recvOut);
     sendRecvData("wrong bytesPerChecksum while writing", true);
 
     sendBuf.reset();
@@ -389,9 +404,8 @@ public class TestDataTransferProtocol ex
       -1 - random.nextInt(oneMil)); // bad datalen
     hdr.write(sendOut);
 
-    SUCCESS.write(recvOut);
-    Text.writeString(recvOut, "");
-    new PipelineAck(100, new Status[]{ERROR}).write(recvOut);
+    sendResponse(Status.SUCCESS, "", recvOut);
+    new PipelineAck(100, new Status[]{Status.ERROR}).write(recvOut);
     sendRecvData("negative DATA_CHUNK len while writing block " + newBlockId, 
                  true);
 
@@ -415,9 +429,8 @@ public class TestDataTransferProtocol ex
     sendOut.writeInt(0);           // zero checksum
     sendOut.flush();
     //ok finally write a block with 0 len
-    SUCCESS.write(recvOut);
-    Text.writeString(recvOut, "");
-    new PipelineAck(100, new Status[]{SUCCESS}).write(recvOut);
+    sendResponse(Status.SUCCESS, "", recvOut);
+    new PipelineAck(100, new Status[]{Status.SUCCESS}).write(recvOut);
     sendRecvData("Writing a zero len block blockid " + newBlockId, false);
     
     /* Test OP_READ_BLOCK */
@@ -450,7 +463,7 @@ public class TestDataTransferProtocol ex
     
     // negative length is ok. Datanode assumes we want to read the whole block.
     recvBuf.reset();
-    SUCCESS.write(recvOut);    
+    sendResponse(Status.SUCCESS, null, recvOut);
     sendBuf.reset();
     DataTransferProtocol.Sender.opReadBlock(sendOut, blk, 0L, 
         -1 - random.nextInt(oneMil), "cl", BlockTokenSecretManager.DUMMY_TOKEN);
@@ -459,7 +472,7 @@ public class TestDataTransferProtocol ex
     
     // length is more than size of block.
     recvBuf.reset();
-    ERROR.write(recvOut);    
+    sendResponse(Status.ERROR, null, recvOut);
     sendBuf.reset();
     DataTransferProtocol.Sender.opReadBlock(sendOut, blk, 0L, 
         fileLen + 1, "cl", BlockTokenSecretManager.DUMMY_TOKEN);

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java?rev=1134492&r1=1134491&r2=1134492&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java Sat Jun 11 00:36:12 2011
@@ -17,8 +17,6 @@
  */
 package org.apache.hadoop.hdfs.server.datanode;
 
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.*;
-
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
@@ -46,6 +44,8 @@ import org.apache.hadoop.hdfs.protocol.D
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.Util;
@@ -264,7 +264,9 @@ public class TestBlockReplacement extend
     // receiveResponse
     DataInputStream reply = new DataInputStream(sock.getInputStream());
 
-    return DataTransferProtocol.Status.read(reply) == SUCCESS;
+    BlockOpResponseProto proto =
+      BlockOpResponseProto.parseDelimitedFrom(reply);
+    return proto.getStatus() == Status.SUCCESS;
   }
 
   /**

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestTransferRbw.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestTransferRbw.java?rev=1134492&r1=1134491&r2=1134492&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestTransferRbw.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestTransferRbw.java Sat Jun 11 00:36:12 2011
@@ -33,6 +33,8 @@ import org.apache.hadoop.hdfs.protocol.D
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.log4j.Level;
@@ -121,9 +123,9 @@ public class TestTransferRbw {
         //transfer RBW
         final ExtendedBlock b = new ExtendedBlock(bpid, oldrbw.getBlockId(), oldrbw.getBytesAcked(),
             oldrbw.getGenerationStamp());
-        final DataTransferProtocol.Status s = DFSTestUtil.transferRbw(
+        final BlockOpResponseProto s = DFSTestUtil.transferRbw(
             b, fs.getClient(), oldnodeinfo, newnodeinfo);
-        Assert.assertEquals(DataTransferProtocol.Status.SUCCESS, s);
+        Assert.assertEquals(Status.SUCCESS, s.getStatus());
       }
 
       //check new rbw

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlockTokenWithDFS.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlockTokenWithDFS.java?rev=1134492&r1=1134491&r2=1134492&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlockTokenWithDFS.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlockTokenWithDFS.java Sat Jun 11 00:36:12 2011
@@ -43,6 +43,7 @@ import org.apache.hadoop.fs.FSDataOutput
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.token.*;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.log4j.Level;
 
 import junit.framework.TestCase;
@@ -142,7 +143,8 @@ public class TestBlockTokenWithDFS exten
             + "when it is expected to be valid", shouldSucceed);
         return;
       }
-      fail("OP_READ_BLOCK failed due to reasons other than access token");
+      fail("OP_READ_BLOCK failed due to reasons other than access token: "
+          + StringUtils.stringifyException(ex));
     } finally {
       if (s != null) {
         try {

Added: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/util/TestExactSizeInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/util/TestExactSizeInputStream.java?rev=1134492&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/util/TestExactSizeInputStream.java (added)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/util/TestExactSizeInputStream.java Sat Jun 11 00:36:12 2011
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.util;
+
+import static org.junit.Assert.*;
+
+import java.io.ByteArrayInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.hdfs.util.ExactSizeInputStream;
+import org.junit.Test;
+
+public class TestExactSizeInputStream {
+  @Test
+  public void testBasicsReadSingle() throws IOException {
+    ExactSizeInputStream s = new ExactSizeInputStream(byteStream("hello"), 3);
+    assertEquals(3, s.available());
+    
+    assertEquals((int)'h', s.read());
+    assertEquals((int)'e', s.read());
+    assertEquals((int)'l', s.read());
+    assertEquals(-1, s.read());
+    assertEquals(0, s.available());
+  }
+  
+  @Test
+  public void testBasicsReadArray() throws IOException {
+    ExactSizeInputStream s = new ExactSizeInputStream(byteStream("hello"), 3);
+    assertEquals(3, s.available());
+    
+    byte[] buf = new byte[10];
+    
+    assertEquals(2, s.read(buf, 0, 2));
+    assertEquals('h', buf[0]);
+    assertEquals('e', buf[1]);
+    
+    assertEquals(1, s.read(buf, 0, 2));
+    assertEquals('l', buf[0]);
+    
+    assertEquals(-1, s.read(buf, 0, 2));
+  }
+  
+  @Test
+  public void testBasicsSkip() throws IOException {
+    ExactSizeInputStream s = new ExactSizeInputStream(byteStream("hello"), 3);
+    assertEquals(3, s.available());
+    
+    assertEquals(2, s.skip(2));
+    assertEquals(1, s.skip(2));
+    assertEquals(0, s.skip(2));
+  }
+  
+  @Test
+  public void testReadNotEnough() throws IOException {
+    // Ask for 5 bytes, only has 2
+    ExactSizeInputStream s = new ExactSizeInputStream(byteStream("he"), 5);
+    assertEquals(2, s.available());
+    
+    assertEquals((int)'h', s.read());
+    assertEquals((int)'e', s.read());
+    try {
+      s.read();
+      fail("Read when should be out of data");
+    } catch (EOFException e) {
+      // expected
+    }
+  }
+  
+  @Test
+  public void testSkipNotEnough() throws IOException {
+    // Ask for 5 bytes, only has 2
+    ExactSizeInputStream s = new ExactSizeInputStream(byteStream("he"), 5);
+    assertEquals(2, s.skip(3));
+    try {
+      s.skip(1);
+      fail("Skip when should be out of data");
+    } catch (EOFException e) {
+      // expected
+    }
+  }
+  
+  @Test
+  public void testReadArrayNotEnough() throws IOException {
+    // Ask for 5 bytes, only has 2
+    ExactSizeInputStream s = new ExactSizeInputStream(byteStream("he"), 5);
+    byte[] buf = new byte[10];
+    assertEquals(2, s.read(buf, 0, 5));
+    try {
+      s.read(buf, 2, 3);
+      fail("Read buf when should be out of data");
+    } catch (EOFException e) {
+      // expected
+    }
+  }
+
+  @Test
+  public void testMark() throws IOException {
+    ExactSizeInputStream s = new ExactSizeInputStream(byteStream("he"), 5);
+    assertFalse(s.markSupported());
+    try {
+      s.mark(1);
+      fail("Mark should not succeed");
+    } catch (UnsupportedOperationException uoe) {
+      // expected
+    }
+  }
+  
+  private static InputStream byteStream(String data) {
+    return new ByteArrayInputStream(data.getBytes());
+  }
+}