You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2011/06/11 02:36:13 UTC
svn commit: r1134492 [1/4] - in /hadoop/hdfs/trunk: ./
src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/
src/java/org/apache/hadoop/hdfs/protocol/proto/
src/java/org/apache/hadoop/hdfs/server/balancer/ src/java/org/apache/hadoo...
Author: todd
Date: Sat Jun 11 00:36:12 2011
New Revision: 1134492
URL: http://svn.apache.org/viewvc?rev=1134492&view=rev
Log:
HDFS-2058. Change Data Transfer wire protocol to use protocol buffers. Contributed by Todd Lipcon.
Added:
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtoUtil.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/HdfsProtoUtil.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/proto/
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/proto/DataTransferProtos.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/proto/HdfsProtos.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/util/ByteBufferOutputStream.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/util/ExactSizeInputStream.java
hadoop/hdfs/trunk/src/proto/
hadoop/hdfs/trunk/src/proto/datatransfer.proto
hadoop/hdfs/trunk/src/proto/hdfs.proto
hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/util/TestExactSizeInputStream.java
Modified:
hadoop/hdfs/trunk/CHANGES.txt
hadoop/hdfs/trunk/build.xml
hadoop/hdfs/trunk/ivy.xml
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/BlockReader.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj
hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/DataTransferProtocolAspects.aj
hadoop/hdfs/trunk/src/test/findbugsExcludeFile.xml
hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java
hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java
hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestTransferRbw.java
hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlockTokenWithDFS.java
Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=1134492&r1=1134491&r2=1134492&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Sat Jun 11 00:36:12 2011
@@ -285,6 +285,9 @@ Trunk (unreleased changes)
layout to be consistent across the binary tgz, rpm, and deb.
(Eric Yang via omalley)
+ HDFS-2058. Change Data Transfer wire protocol to use protocol buffers.
+ (todd)
+
IMPROVEMENTS
HDFS-1875. MiniDFSCluster hard-codes dfs.datanode.address to localhost
Modified: hadoop/hdfs/trunk/build.xml
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/build.xml?rev=1134492&r1=1134491&r2=1134492&view=diff
==============================================================================
--- hadoop/hdfs/trunk/build.xml (original)
+++ hadoop/hdfs/trunk/build.xml Sat Jun 11 00:36:12 2011
@@ -40,6 +40,7 @@
<property name="src.dir" value="${basedir}/src"/>
<property name="java.src.dir" value="${src.dir}/java"/>
+ <property name="proto.src.dir" value="${src.dir}/proto"/>
<property name="anttasks.dir" value="${basedir}/src/ant"/>
<property name="lib.dir" value="${basedir}/lib"/>
<property name="conf.dir" value="${basedir}/conf"/>
@@ -201,6 +202,9 @@
<property name="build.dir.eclipse-test-classes" value="${build.dir.eclipse}/classes-test"/>
<property name="build.dir.eclipse-contrib-classes" value="${build.dir.eclipse}/classes-contrib"/>
+ <!-- Protoc properties -->
+ <property name="protoc" value="protoc" />
+
<property name="clover.jar" location="${clover.home}/lib/clover.jar"/>
<available property="clover.present" file="${clover.jar}" />
@@ -923,7 +927,9 @@
bottom="Copyright &copy; ${year} The Apache Software Foundation"
maxmemory="${javadoc.maxmemory}">
- <packageset dir="${java.src.dir}"/>
+ <packageset dir="${java.src.dir}">
+ <exclude name="org/apache/hadoop/hdfs/protocol/proto" />
+ </packageset>
<link href="${javadoc.link.java}"/>
<classpath >
<path refid="classpath" />
@@ -943,7 +949,9 @@
<param name="-apidir" value="${jdiff.xml.dir}"/>
<param name="-apiname" value="hadoop-hdfs ${version}"/>
</doclet>
- <packageset dir="src/java"/>
+ <packageset dir="${java.src.dir}">
+ <exclude name="org/apache/hadoop/hdfs/protocol/proto" />
+ </packageset>
<classpath >
<path refid="classpath" />
<path refid="jdiff-classpath" />
@@ -1365,6 +1373,8 @@
<exclude name="lib/jdiff/"/>
<exclude name="**/conf/*" />
<exclude name="webapps/**/WEB-INF/web.xml"/>
+ <!-- generated code for protobufs don't have headers -->
+ <exclude name="src/java/org/apache/hadoop/hdfs/protocol/proto/*Protos.java" />
<exclude name="src/docs/releasenotes.html" />
<exclude name="src/test/hdfs/org/apache/hadoop/cli/clitest_data/" />
<exclude name="src/test/hdfs/org/apache/hadoop/hdfs/tools/offlineEditsViewer/editsStored*" />
@@ -1672,6 +1682,27 @@
</copy>
</target>
+ <target name="generate-protos"
+ description="Generate Java code from protocol buffer definitions">
+ <exec executable="bash" resultproperty="which.protoc.result" outputproperty="">
+ <arg value="-c" />
+ <arg value="which ${protoc}" />
+ </exec>
+ <condition property="protoc.found">
+ <equals arg1="${which.protoc.result}" arg2="0" />
+ </condition>
+ <fail unless="protoc.found"
+ message="No protoc compiler found. Please pass -Dprotoc=/path/to/protoc if it is not on your path." />
+
+ <exec executable="${protoc}" failonerror="true">
+ <arg value="--java_out=${java.src.dir}" />
+ <arg value="--proto_path=${proto.src.dir}" />
+ <arg value="${proto.src.dir}/hdfs.proto" />
+ <arg value="${proto.src.dir}/datatransfer.proto" />
+ </exec>
+ <echo message="Generated protocol buffer code successfully." />
+ </target>
+
<target name="ivy-init-dirs">
<mkdir dir="${build.ivy.dir}" />
<mkdir dir="${build.ivy.lib.dir}" />
Modified: hadoop/hdfs/trunk/ivy.xml
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/ivy.xml?rev=1134492&r1=1134491&r2=1134492&view=diff
==============================================================================
--- hadoop/hdfs/trunk/ivy.xml (original)
+++ hadoop/hdfs/trunk/ivy.xml Sat Jun 11 00:36:12 2011
@@ -67,6 +67,7 @@
<dependency org="commons-logging" name="commons-logging" rev="${commons-logging.version}" conf="compile->master"/>
<dependency org="commons-daemon" name="commons-daemon" rev="${commons-daemon.version}" conf="hdfs->default" />
<dependency org="log4j" name="log4j" rev="${log4j.version}" conf="common->master"/>
+ <dependency org="com.google.protobuf" name="protobuf-java" rev="2.4.0a" conf="common->master"/>
<dependency org="org.apache.hadoop" name="avro" rev="${avro.version}" conf="compile->master">
<exclude module="ant"/>
<exclude module="jetty"/>
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/BlockReader.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/BlockReader.java?rev=1134492&r1=1134491&r2=1134492&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/BlockReader.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/BlockReader.java Sat Jun 11 00:36:12 2011
@@ -17,10 +17,6 @@
*/
package org.apache.hadoop.hdfs;
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.CHECKSUM_OK;
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.ERROR_ACCESS_TOKEN;
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.SUCCESS;
-
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
@@ -37,6 +33,10 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PacketHeader;
+import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.server.common.HdfsConstants;
@@ -45,6 +45,9 @@ import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
+import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.vintPrefixed;
+
+
/** This is a wrapper around connection to datanode
* and understands checksum, offset etc.
*
@@ -138,9 +141,9 @@ public class BlockReader extends FSInput
// if eos was set in the previous read, send a status code to the DN
if (eos && !eosBefore && nRead >= 0) {
if (needChecksum()) {
- sendReadResult(dnSock, CHECKSUM_OK);
+ sendReadResult(dnSock, Status.CHECKSUM_OK);
} else {
- sendReadResult(dnSock, SUCCESS);
+ sendReadResult(dnSock, Status.SUCCESS);
}
}
return nRead;
@@ -313,20 +316,13 @@ public class BlockReader extends FSInput
pos + bytesToRead >= bytesNeededToFinish) {
// Read header
- int packetLen = in.readInt();
- long offsetInBlock = in.readLong();
- long seqno = in.readLong();
- boolean lastPacketInBlock = in.readBoolean();
- int dataLen = in.readInt();
+ PacketHeader hdr = new PacketHeader();
+ hdr.readFields(in);
- if (!lastPacketInBlock ||
- dataLen != 0) {
+ if (!hdr.isLastPacketInBlock() ||
+ hdr.getDataLen() != 0) {
throw new IOException("Expected empty end-of-read packet! Header: " +
- "(packetLen : " + packetLen +
- ", offsetInBlock : " + offsetInBlock +
- ", seqno : " + seqno +
- ", lastInBlock : " + lastPacketInBlock +
- ", dataLen : " + dataLen);
+ hdr);
}
eos = true;
@@ -422,9 +418,10 @@ public class BlockReader extends FSInput
new BufferedInputStream(NetUtils.getInputStream(sock),
bufferSize));
- DataTransferProtocol.Status status = DataTransferProtocol.Status.read(in);
- if (status != SUCCESS) {
- if (status == ERROR_ACCESS_TOKEN) {
+ BlockOpResponseProto status = BlockOpResponseProto.parseFrom(
+ vintPrefixed(in));
+ if (status.getStatus() != Status.SUCCESS) {
+ if (status.getStatus() == Status.ERROR_ACCESS_TOKEN) {
throw new InvalidBlockTokenException(
"Got access token error for OP_READ_BLOCK, self="
+ sock.getLocalSocketAddress() + ", remote="
@@ -499,11 +496,16 @@ public class BlockReader extends FSInput
* closing our connection (which we will re-open), but won't affect
* data correctness.
*/
- void sendReadResult(Socket sock, DataTransferProtocol.Status statusCode) {
+ void sendReadResult(Socket sock, Status statusCode) {
assert !sentStatusCode : "already sent status code to " + sock;
try {
OutputStream out = NetUtils.getOutputStream(sock, HdfsConstants.WRITE_TIMEOUT);
- statusCode.writeOutputStream(out);
+
+ ClientReadStatusProto.newBuilder()
+ .setStatus(statusCode)
+ .build()
+ .writeDelimitedTo(out);
+
out.flush();
sentStatusCode = true;
} catch (IOException e) {
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1134492&r1=1134491&r2=1134492&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java Sat Jun 11 00:36:12 2011
@@ -19,8 +19,6 @@
package org.apache.hadoop.hdfs;
import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Op.BLOCK_CHECKSUM;
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.ERROR_ACCESS_TOKEN;
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.SUCCESS;
import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
@@ -75,7 +73,11 @@ import org.apache.hadoop.hdfs.protocol.H
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
+import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.common.HdfsConstants;
import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
@@ -1118,9 +1120,11 @@ public class DFSClient implements FSCons
DataTransferProtocol.Sender.opBlockChecksum(out, block,
lb.getBlockToken());
- final DataTransferProtocol.Status reply = DataTransferProtocol.Status.read(in);
- if (reply != SUCCESS) {
- if (reply == ERROR_ACCESS_TOKEN
+ final BlockOpResponseProto reply =
+ BlockOpResponseProto.parseFrom(HdfsProtoUtil.vintPrefixed(in));
+
+ if (reply.getStatus() != Status.SUCCESS) {
+ if (reply.getStatus() == Status.ERROR_ACCESS_TOKEN
&& i > lastRetriedIndex) {
if (LOG.isDebugEnabled()) {
LOG.debug("Got access token error in response to OP_BLOCK_CHECKSUM "
@@ -1138,9 +1142,12 @@ public class DFSClient implements FSCons
+ block + " from datanode " + datanodes[j].getName());
}
}
+
+ OpBlockChecksumResponseProto checksumData =
+ reply.getChecksumResponse();
//read byte-per-checksum
- final int bpc = in.readInt();
+ final int bpc = checksumData.getBytesPerCrc();
if (i == 0) { //first block
bytesPerCRC = bpc;
}
@@ -1150,13 +1157,14 @@ public class DFSClient implements FSCons
}
//read crc-per-block
- final long cpb = in.readLong();
+ final long cpb = checksumData.getCrcPerBlock();
if (locatedblocks.size() > 1 && i == 0) {
crcPerBlock = cpb;
}
//read md5
- final MD5Hash md5 = MD5Hash.read(in);
+ final MD5Hash md5 = new MD5Hash(
+ checksumData.getMd5().toByteArray());
md5.write(md5out);
done = true;
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java?rev=1134492&r1=1134491&r2=1134492&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java Sat Jun 11 00:36:12 2011
@@ -17,8 +17,9 @@
*/
package org.apache.hadoop.hdfs;
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.ERROR_ACCESS_TOKEN;
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.SUCCESS;
+import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.SUCCESS;
+import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR;
+
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
@@ -56,7 +57,10 @@ import org.apache.hadoop.hdfs.protocol.E
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
+import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
@@ -75,6 +79,8 @@ import org.apache.hadoop.util.Progressab
import org.apache.hadoop.util.PureJavaCrc32;
import org.apache.hadoop.util.StringUtils;
+
+
/****************************************************************
* DFSOutputStream creates files from a stream of bytes.
*
@@ -650,7 +656,7 @@ class DFSOutputStream extends FSOutputSu
long seqno = ack.getSeqno();
// processes response status from datanodes.
for (int i = ack.getNumOfReplies()-1; i >=0 && dfsClient.clientRunning; i--) {
- final DataTransferProtocol.Status reply = ack.getReply(i);
+ final Status reply = ack.getReply(i);
if (reply != SUCCESS) {
errorIndex = i; // first bad datanode
throw new IOException("Bad response " + reply +
@@ -848,7 +854,9 @@ class DFSOutputStream extends FSOutputSu
//ack
in = new DataInputStream(NetUtils.getInputStream(sock));
- if (SUCCESS != DataTransferProtocol.Status.read(in)) {
+ BlockOpResponseProto response =
+ BlockOpResponseProto.parseFrom(HdfsProtoUtil.vintPrefixed(in));
+ if (SUCCESS != response.getStatus()) {
throw new IOException("Failed to add a datanode");
}
} finally {
@@ -990,7 +998,7 @@ class DFSOutputStream extends FSOutputSu
//
private boolean createBlockOutputStream(DatanodeInfo[] nodes, long newGS,
boolean recoveryFlag) {
- DataTransferProtocol.Status pipelineStatus = SUCCESS;
+ Status pipelineStatus = SUCCESS;
String firstBadLink = "";
if (DFSClient.LOG.isDebugEnabled()) {
for (int i = 0; i < nodes.length; i++) {
@@ -1023,10 +1031,13 @@ class DFSOutputStream extends FSOutputSu
out.flush();
// receive ack for connect
- pipelineStatus = DataTransferProtocol.Status.read(blockReplyStream);
- firstBadLink = Text.readString(blockReplyStream);
+ BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
+ HdfsProtoUtil.vintPrefixed(blockReplyStream));
+ pipelineStatus = resp.getStatus();
+ firstBadLink = resp.getFirstBadLink();
+
if (pipelineStatus != SUCCESS) {
- if (pipelineStatus == ERROR_ACCESS_TOKEN) {
+ if (pipelineStatus == Status.ERROR_ACCESS_TOKEN) {
throw new InvalidBlockTokenException(
"Got access token error for connect ack with firstBadLink as "
+ firstBadLink);
Added: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtoUtil.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtoUtil.java?rev=1134492&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtoUtil.java (added)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtoUtil.java Sat Jun 11 00:36:12 2011
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol;
+
+
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BaseHeaderProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.security.token.Token;
+
+
+/**
+ * Static utilities for dealing with the protocol buffers used by the
+ * Data Transfer Protocol.
+ */
+abstract class DataTransferProtoUtil {
+
+ static DataTransferProtocol.BlockConstructionStage fromProto(
+ OpWriteBlockProto.BlockConstructionStage stage) {
+ return BlockConstructionStage.valueOf(BlockConstructionStage.class,
+ stage.name());
+ }
+
+ static OpWriteBlockProto.BlockConstructionStage toProto(
+ BlockConstructionStage stage) {
+ return OpWriteBlockProto.BlockConstructionStage.valueOf(
+ stage.name());
+ }
+
+ static ClientOperationHeaderProto buildClientHeader(ExtendedBlock blk,
+ String client, Token<BlockTokenIdentifier> blockToken) {
+ ClientOperationHeaderProto header =
+ ClientOperationHeaderProto.newBuilder()
+ .setBaseHeader(buildBaseHeader(blk, blockToken))
+ .setClientName(client)
+ .build();
+ return header;
+ }
+
+ static BaseHeaderProto buildBaseHeader(ExtendedBlock blk,
+ Token<BlockTokenIdentifier> blockToken) {
+ return BaseHeaderProto.newBuilder()
+ .setBlock(HdfsProtoUtil.toProto(blk))
+ .setToken(HdfsProtoUtil.toProto(blockToken))
+ .build();
+ }
+}
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java?rev=1134492&r1=1134491&r2=1134492&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java Sat Jun 11 00:36:12 2011
@@ -22,8 +22,10 @@ import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
+import java.util.Arrays;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -32,12 +34,31 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpTransferBlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PacketHeaderProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.hdfs.util.ByteBufferOutputStream;
import org.apache.hadoop.security.token.Token;
+import static org.apache.hadoop.hdfs.protocol.DataTransferProtoUtil.fromProto;
+import static org.apache.hadoop.hdfs.protocol.DataTransferProtoUtil.toProto;
+import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.fromProto;
+import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.fromProtos;
+import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.toProto;
+import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.toProtos;
+import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.vintPrefixed;
+
+import com.google.protobuf.Message;
+
/**
* Transfer data to/from datanode using a streaming protocol.
*/
@@ -89,373 +110,8 @@ public interface DataTransferProtocol {
public void write(DataOutput out) throws IOException {
out.write(code);
}
-
- /** Base class for all headers. */
- private static abstract class BaseHeader implements Writable {
- private ExtendedBlock block;
- private Token<BlockTokenIdentifier> blockToken;
-
- private BaseHeader() {}
-
- private BaseHeader(
- final ExtendedBlock block,
- final Token<BlockTokenIdentifier> blockToken) {
- this.block = block;
- this.blockToken = blockToken;
- }
-
- /** @return the extended block. */
- public final ExtendedBlock getBlock() {
- return block;
- }
-
- /** @return the block token. */
- public final Token<BlockTokenIdentifier> getBlockToken() {
- return blockToken;
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- block.writeId(out);
- blockToken.write(out);
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- block = new ExtendedBlock();
- block.readId(in);
-
- blockToken = new Token<BlockTokenIdentifier>();
- blockToken.readFields(in);
- }
- }
-
- /** Base header for all client operation. */
- private static abstract class ClientOperationHeader extends BaseHeader {
- private String clientName;
-
- private ClientOperationHeader() {}
-
- private ClientOperationHeader(
- final ExtendedBlock block,
- final Token<BlockTokenIdentifier> blockToken,
- final String clientName) {
- super(block, blockToken);
- this.clientName = clientName;
- }
-
- /** @return client name. */
- public final String getClientName() {
- return clientName;
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- super.write(out);
- Text.writeString(out, clientName);
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- super.readFields(in);
- clientName = Text.readString(in);
- }
- }
-
- /** {@link Op#READ_BLOCK} header. */
- public static class ReadBlockHeader extends ClientOperationHeader {
- private long offset;
- private long length;
-
- /** Default constructor */
- public ReadBlockHeader() {}
-
- /** Constructor with all parameters */
- public ReadBlockHeader(
- final ExtendedBlock blk,
- final Token<BlockTokenIdentifier> blockToken,
- final String clientName,
- final long offset,
- final long length) {
- super(blk, blockToken, clientName);
- this.offset = offset;
- this.length = length;
- }
-
- /** @return the offset */
- public long getOffset() {
- return offset;
- }
-
- /** @return the length */
- public long getLength() {
- return length;
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- super.write(out);
- out.writeLong(offset);
- out.writeLong(length);
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- super.readFields(in);
- offset = in.readLong();
- length = in.readLong();
- }
- }
-
- /** {@link Op#WRITE_BLOCK} header. */
- public static class WriteBlockHeader extends ClientOperationHeader {
- private DatanodeInfo[] targets;
-
- private DatanodeInfo source;
- private BlockConstructionStage stage;
- private int pipelineSize;
- private long minBytesRcvd;
- private long maxBytesRcvd;
- private long latestGenerationStamp;
-
- /** Default constructor */
- public WriteBlockHeader() {}
-
- /** Constructor with all parameters */
- public WriteBlockHeader(
- final ExtendedBlock blk,
- final Token<BlockTokenIdentifier> blockToken,
- final String clientName,
- final DatanodeInfo[] targets,
- final DatanodeInfo source,
- final BlockConstructionStage stage,
- final int pipelineSize,
- final long minBytesRcvd,
- final long maxBytesRcvd,
- final long latestGenerationStamp
- ) throws IOException {
- super(blk, blockToken, clientName);
- this.targets = targets;
- this.source = source;
- this.stage = stage;
- this.pipelineSize = pipelineSize;
- this.minBytesRcvd = minBytesRcvd;
- this.maxBytesRcvd = maxBytesRcvd;
- this.latestGenerationStamp = latestGenerationStamp;
- }
-
- /** @return targets. */
- public DatanodeInfo[] getTargets() {
- return targets;
- }
-
- /** @return the source */
- public DatanodeInfo getSource() {
- return source;
- }
-
- /** @return the stage */
- public BlockConstructionStage getStage() {
- return stage;
- }
-
- /** @return the pipeline size */
- public int getPipelineSize() {
- return pipelineSize;
- }
-
- /** @return the minimum bytes received. */
- public long getMinBytesRcvd() {
- return minBytesRcvd;
- }
-
- /** @return the maximum bytes received. */
- public long getMaxBytesRcvd() {
- return maxBytesRcvd;
- }
-
- /** @return the latest generation stamp */
- public long getLatestGenerationStamp() {
- return latestGenerationStamp;
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- super.write(out);
- Sender.write(out, 1, targets);
-
- out.writeBoolean(source != null);
- if (source != null) {
- source.write(out);
- }
-
- stage.write(out);
- out.writeInt(pipelineSize);
- WritableUtils.writeVLong(out, minBytesRcvd);
- WritableUtils.writeVLong(out, maxBytesRcvd);
- WritableUtils.writeVLong(out, latestGenerationStamp);
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- super.readFields(in);
- targets = Receiver.readDatanodeInfos(in);
-
- source = in.readBoolean()? DatanodeInfo.read(in): null;
- stage = BlockConstructionStage.readFields(in);
- pipelineSize = in.readInt(); // num of datanodes in entire pipeline
- minBytesRcvd = WritableUtils.readVLong(in);
- maxBytesRcvd = WritableUtils.readVLong(in);
- latestGenerationStamp = WritableUtils.readVLong(in);
- }
- }
-
- /** {@link Op#TRANSFER_BLOCK} header. */
- public static class TransferBlockHeader extends ClientOperationHeader {
- private DatanodeInfo[] targets;
-
- /** Default constructor */
- public TransferBlockHeader() {}
-
- /** Constructor with all parameters */
- public TransferBlockHeader(
- final ExtendedBlock blk,
- final Token<BlockTokenIdentifier> blockToken,
- final String clientName,
- final DatanodeInfo[] targets) throws IOException {
- super(blk, blockToken, clientName);
- this.targets = targets;
- }
-
- /** @return targets. */
- public DatanodeInfo[] getTargets() {
- return targets;
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- super.write(out);
- Sender.write(out, 0, targets);
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- super.readFields(in);
- targets = Receiver.readDatanodeInfos(in);
- }
- }
-
- /** {@link Op#REPLACE_BLOCK} header. */
- public static class ReplaceBlockHeader extends BaseHeader {
- private String delHint;
- private DatanodeInfo source;
-
- /** Default constructor */
- public ReplaceBlockHeader() {}
-
- /** Constructor with all parameters */
- public ReplaceBlockHeader(final ExtendedBlock blk,
- final Token<BlockTokenIdentifier> blockToken,
- final String storageId,
- final DatanodeInfo src) throws IOException {
- super(blk, blockToken);
- this.delHint = storageId;
- this.source = src;
- }
-
- /** @return delete-hint. */
- public String getDelHint() {
- return delHint;
- }
-
- /** @return source datanode. */
- public DatanodeInfo getSource() {
- return source;
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- super.write(out);
- Text.writeString(out, delHint);
- source.write(out);
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- super.readFields(in);
- delHint = Text.readString(in);
- source = DatanodeInfo.read(in);
- }
- }
-
- /** {@link Op#COPY_BLOCK} header. */
- public static class CopyBlockHeader extends BaseHeader {
- /** Default constructor */
- public CopyBlockHeader() {}
-
- /** Constructor with all parameters */
- public CopyBlockHeader(
- final ExtendedBlock block,
- final Token<BlockTokenIdentifier> blockToken) {
- super(block, blockToken);
- }
- }
-
- /** {@link Op#BLOCK_CHECKSUM} header. */
- public static class BlockChecksumHeader extends BaseHeader {
- /** Default constructor */
- public BlockChecksumHeader() {}
-
- /** Constructor with all parameters */
- public BlockChecksumHeader(
- final ExtendedBlock block,
- final Token<BlockTokenIdentifier> blockToken) {
- super(block, blockToken);
- }
- }
}
-
-
- /** Status */
- public enum Status {
- SUCCESS(0),
- ERROR(1),
- ERROR_CHECKSUM(2),
- ERROR_INVALID(3),
- ERROR_EXISTS(4),
- ERROR_ACCESS_TOKEN(5),
- CHECKSUM_OK(6);
-
- /** The code for this operation. */
- private final int code;
- private Status(int code) {
- this.code = code;
- }
-
- private static final int FIRST_CODE = values()[0].code;
- /** Return the object represented by the code. */
- private static Status valueOf(int code) {
- final int i = code - FIRST_CODE;
- return i < 0 || i >= values().length? null: values()[i];
- }
-
- /** Read from in */
- public static Status read(DataInput in) throws IOException {
- return valueOf(in.readShort());
- }
-
- /** Write to out */
- public void write(DataOutput out) throws IOException {
- out.writeShort(code);
- }
-
- /** Write to out */
- public void writeOutputStream(OutputStream out) throws IOException {
- out.write(new byte[] {(byte)(code >>> 8), (byte)code});
- }
- };
-
public enum BlockConstructionStage {
/** The enumerates are always listed as regular stage followed by the
* recovery stage.
@@ -492,23 +148,9 @@ public interface DataTransferProtocol {
return values()[ordinal()|RECOVERY_BIT];
}
}
-
- private static BlockConstructionStage valueOf(byte code) {
- return code < 0 || code >= values().length? null: values()[code];
- }
-
- /** Read from in */
- private static BlockConstructionStage readFields(DataInput in)
- throws IOException {
- return valueOf(in.readByte());
- }
-
- /** write to out */
- private void write(DataOutput out) throws IOException {
- out.writeByte(ordinal());
- }
}
+
/** Sender */
@InterfaceAudience.Private
@InterfaceStability.Evolving
@@ -520,11 +162,10 @@ public interface DataTransferProtocol {
op.write(out);
}
- /** Send an operation request. */
private static void send(final DataOutputStream out, final Op opcode,
- final Op.BaseHeader parameters) throws IOException {
+ final Message proto) throws IOException {
op(out, opcode);
- parameters.write(out);
+ proto.writeDelimitedTo(out);
out.flush();
}
@@ -533,59 +174,90 @@ public interface DataTransferProtocol {
long blockOffset, long blockLen, String clientName,
Token<BlockTokenIdentifier> blockToken)
throws IOException {
- send(out, Op.READ_BLOCK, new Op.ReadBlockHeader(blk, blockToken,
- clientName, blockOffset, blockLen));
+
+ OpReadBlockProto proto = OpReadBlockProto.newBuilder()
+ .setHeader(DataTransferProtoUtil.buildClientHeader(blk, clientName, blockToken))
+ .setOffset(blockOffset)
+ .setLen(blockLen)
+ .build();
+
+ send(out, Op.READ_BLOCK, proto);
}
+
/** Send OP_WRITE_BLOCK */
public static void opWriteBlock(DataOutputStream out, ExtendedBlock blk,
int pipelineSize, BlockConstructionStage stage, long newGs,
long minBytesRcvd, long maxBytesRcvd, String client, DatanodeInfo src,
DatanodeInfo[] targets, Token<BlockTokenIdentifier> blockToken)
throws IOException {
- send(out, Op.WRITE_BLOCK, new Op.WriteBlockHeader(blk, blockToken,
- client, targets, src, stage, pipelineSize, minBytesRcvd, maxBytesRcvd,
- newGs));
+ ClientOperationHeaderProto header = DataTransferProtoUtil.buildClientHeader(blk, client,
+ blockToken);
+
+ OpWriteBlockProto.Builder proto = OpWriteBlockProto.newBuilder()
+ .setHeader(header)
+ .addAllTargets(
+ toProtos(targets, 1))
+ .setStage(toProto(stage))
+ .setPipelineSize(pipelineSize)
+ .setMinBytesRcvd(minBytesRcvd)
+ .setMaxBytesRcvd(maxBytesRcvd)
+ .setLatestGenerationStamp(newGs);
+
+ if (src != null) {
+ proto.setSource(toProto(src));
+ }
+
+ send(out, Op.WRITE_BLOCK, proto.build());
}
/** Send {@link Op#TRANSFER_BLOCK} */
public static void opTransferBlock(DataOutputStream out, ExtendedBlock blk,
String client, DatanodeInfo[] targets,
Token<BlockTokenIdentifier> blockToken) throws IOException {
- send(out, Op.TRANSFER_BLOCK, new Op.TransferBlockHeader(blk, blockToken,
- client, targets));
+
+ OpTransferBlockProto proto = OpTransferBlockProto.newBuilder()
+ .setHeader(DataTransferProtoUtil.buildClientHeader(
+ blk, client, blockToken))
+ .addAllTargets(toProtos(targets, 0))
+ .build();
+
+ send(out, Op.TRANSFER_BLOCK, proto);
}
/** Send OP_REPLACE_BLOCK */
public static void opReplaceBlock(DataOutputStream out,
ExtendedBlock blk, String delHint, DatanodeInfo src,
Token<BlockTokenIdentifier> blockToken) throws IOException {
- send(out, Op.REPLACE_BLOCK, new Op.ReplaceBlockHeader(blk, blockToken,
- delHint, src));
+ OpReplaceBlockProto proto = OpReplaceBlockProto.newBuilder()
+ .setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken))
+ .setDelHint(delHint)
+ .setSource(toProto(src))
+ .build();
+
+ send(out, Op.REPLACE_BLOCK, proto);
}
/** Send OP_COPY_BLOCK */
public static void opCopyBlock(DataOutputStream out, ExtendedBlock blk,
Token<BlockTokenIdentifier> blockToken)
throws IOException {
- send(out, Op.COPY_BLOCK, new Op.CopyBlockHeader(blk, blockToken));
+ OpCopyBlockProto proto = OpCopyBlockProto.newBuilder()
+ .setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken))
+ .build();
+
+ send(out, Op.COPY_BLOCK, proto);
}
/** Send OP_BLOCK_CHECKSUM */
public static void opBlockChecksum(DataOutputStream out, ExtendedBlock blk,
Token<BlockTokenIdentifier> blockToken)
throws IOException {
- send(out, Op.BLOCK_CHECKSUM, new Op.BlockChecksumHeader(blk, blockToken));
- }
-
- /** Write an array of {@link DatanodeInfo} */
- private static void write(final DataOutput out,
- final int start,
- final DatanodeInfo[] datanodeinfos) throws IOException {
- out.writeInt(datanodeinfos.length - start);
- for (int i = start; i < datanodeinfos.length; i++) {
- datanodeinfos[i].write(out);
- }
+ OpBlockChecksumProto proto = OpBlockChecksumProto.newBuilder()
+ .setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken))
+ .build();
+
+ send(out, Op.BLOCK_CHECKSUM, proto);
}
}
@@ -631,12 +303,16 @@ public interface DataTransferProtocol {
/** Receive OP_READ_BLOCK */
private void opReadBlock(DataInputStream in) throws IOException {
- final Op.ReadBlockHeader h = new Op.ReadBlockHeader();
- h.readFields(in);
- opReadBlock(in, h.getBlock(), h.getOffset(), h.getLength(),
- h.getClientName(), h.getBlockToken());
- }
+ OpReadBlockProto proto = OpReadBlockProto.parseFrom(vintPrefixed(in));
+
+ ExtendedBlock b = fromProto(
+ proto.getHeader().getBaseHeader().getBlock());
+ Token<BlockTokenIdentifier> token = fromProto(
+ proto.getHeader().getBaseHeader().getToken());
+ opReadBlock(in, b, proto.getOffset(), proto.getLen(),
+ proto.getHeader().getClientName(), token);
+ }
/**
* Abstract OP_READ_BLOCK method. Read a block.
*/
@@ -646,12 +322,17 @@ public interface DataTransferProtocol {
/** Receive OP_WRITE_BLOCK */
private void opWriteBlock(DataInputStream in) throws IOException {
- final Op.WriteBlockHeader h = new Op.WriteBlockHeader();
- h.readFields(in);
- opWriteBlock(in, h.getBlock(), h.getPipelineSize(), h.getStage(),
- h.getLatestGenerationStamp(),
- h.getMinBytesRcvd(), h.getMaxBytesRcvd(),
- h.getClientName(), h.getSource(), h.getTargets(), h.getBlockToken());
+ final OpWriteBlockProto proto = OpWriteBlockProto.parseFrom(vintPrefixed(in));
+ opWriteBlock(in,
+ fromProto(proto.getHeader().getBaseHeader().getBlock()),
+ proto.getPipelineSize(),
+ fromProto(proto.getStage()),
+ proto.getLatestGenerationStamp(),
+ proto.getMinBytesRcvd(), proto.getMaxBytesRcvd(),
+ proto.getHeader().getClientName(),
+ fromProto(proto.getSource()),
+ fromProtos(proto.getTargetsList()),
+ fromProto(proto.getHeader().getBaseHeader().getToken()));
}
/**
@@ -666,10 +347,14 @@ public interface DataTransferProtocol {
/** Receive {@link Op#TRANSFER_BLOCK} */
private void opTransferBlock(DataInputStream in) throws IOException {
- final Op.TransferBlockHeader h = new Op.TransferBlockHeader();
- h.readFields(in);
- opTransferBlock(in, h.getBlock(), h.getClientName(), h.getTargets(),
- h.getBlockToken());
+ final OpTransferBlockProto proto =
+ OpTransferBlockProto.parseFrom(vintPrefixed(in));
+
+ opTransferBlock(in,
+ fromProto(proto.getHeader().getBaseHeader().getBlock()),
+ proto.getHeader().getClientName(),
+ fromProtos(proto.getTargetsList()),
+ fromProto(proto.getHeader().getBaseHeader().getToken()));
}
/**
@@ -684,10 +369,13 @@ public interface DataTransferProtocol {
/** Receive OP_REPLACE_BLOCK */
private void opReplaceBlock(DataInputStream in) throws IOException {
- final Op.ReplaceBlockHeader h = new Op.ReplaceBlockHeader();
- h.readFields(in);
- opReplaceBlock(in, h.getBlock(), h.getDelHint(), h.getSource(),
- h.getBlockToken());
+ OpReplaceBlockProto proto = OpReplaceBlockProto.parseFrom(vintPrefixed(in));
+
+ opReplaceBlock(in,
+ fromProto(proto.getHeader().getBlock()),
+ proto.getDelHint(),
+ fromProto(proto.getSource()),
+ fromProto(proto.getHeader().getToken()));
}
/**
@@ -700,9 +388,11 @@ public interface DataTransferProtocol {
/** Receive OP_COPY_BLOCK */
private void opCopyBlock(DataInputStream in) throws IOException {
- final Op.CopyBlockHeader h = new Op.CopyBlockHeader();
- h.readFields(in);
- opCopyBlock(in, h.getBlock(), h.getBlockToken());
+ OpCopyBlockProto proto = OpCopyBlockProto.parseFrom(vintPrefixed(in));
+
+ opCopyBlock(in,
+ fromProto(proto.getHeader().getBlock()),
+ fromProto(proto.getHeader().getToken()));
}
/**
@@ -715,9 +405,11 @@ public interface DataTransferProtocol {
/** Receive OP_BLOCK_CHECKSUM */
private void opBlockChecksum(DataInputStream in) throws IOException {
- final Op.BlockChecksumHeader h = new Op.BlockChecksumHeader();
- h.readFields(in);
- opBlockChecksum(in, h.getBlock(), h.getBlockToken());
+ OpBlockChecksumProto proto = OpBlockChecksumProto.parseFrom(vintPrefixed(in));
+
+ opBlockChecksum(in,
+ fromProto(proto.getHeader().getBlock()),
+ fromProto(proto.getHeader().getToken()));
}
/**
@@ -727,29 +419,13 @@ public interface DataTransferProtocol {
protected abstract void opBlockChecksum(DataInputStream in,
ExtendedBlock blk, Token<BlockTokenIdentifier> blockToken)
throws IOException;
-
- /** Read an array of {@link DatanodeInfo} */
- private static DatanodeInfo[] readDatanodeInfos(final DataInput in
- ) throws IOException {
- final int n = in.readInt();
- if (n < 0) {
- throw new IOException("Mislabelled incoming datastream: "
- + n + " = n < 0");
- }
- final DatanodeInfo[] datanodeinfos= new DatanodeInfo[n];
- for (int i = 0; i < datanodeinfos.length; i++) {
- datanodeinfos[i] = DatanodeInfo.read(in);
- }
- return datanodeinfos;
- }
}
/** reply **/
@InterfaceAudience.Private
@InterfaceStability.Evolving
- public static class PipelineAck implements Writable {
- private long seqno;
- private Status replies[];
+ public static class PipelineAck {
+ PipelineAckProto proto;
public final static long UNKOWN_SEQNO = -2;
/** default constructor **/
@@ -762,8 +438,10 @@ public interface DataTransferProtocol {
* @param replies an array of replies
*/
public PipelineAck(long seqno, Status[] replies) {
- this.seqno = seqno;
- this.replies = replies;
+ proto = PipelineAckProto.newBuilder()
+ .setSeqno(seqno)
+ .addAllStatus(Arrays.asList(replies))
+ .build();
}
/**
@@ -771,7 +449,7 @@ public interface DataTransferProtocol {
* @return the sequence number
*/
public long getSeqno() {
- return seqno;
+ return proto.getSeqno();
}
/**
@@ -779,7 +457,7 @@ public interface DataTransferProtocol {
* @return the number of replies
*/
public short getNumOfReplies() {
- return (short)replies.length;
+ return (short)proto.getStatusCount();
}
/**
@@ -787,11 +465,7 @@ public interface DataTransferProtocol {
* @return the the ith reply
*/
public Status getReply(int i) {
- if (i<0 || i>=replies.length) {
- throw new IllegalArgumentException("The input parameter " + i +
- " should in the range of [0, " + replies.length);
- }
- return replies[i];
+ return proto.getStatus(i);
}
/**
@@ -799,8 +473,8 @@ public interface DataTransferProtocol {
* @return true if all statuses are SUCCESS
*/
public boolean isSuccess() {
- for (Status reply : replies) {
- if (reply != Status.SUCCESS) {
+ for (DataTransferProtos.Status reply : proto.getStatusList()) {
+ if (reply != DataTransferProtos.Status.SUCCESS) {
return false;
}
}
@@ -808,54 +482,37 @@ public interface DataTransferProtocol {
}
/**** Writable interface ****/
- @Override // Writable
- public void readFields(DataInput in) throws IOException {
- seqno = in.readLong();
- short numOfReplies = in.readShort();
- replies = new Status[numOfReplies];
- for (int i=0; i<numOfReplies; i++) {
- replies[i] = Status.read(in);
- }
+ public void readFields(InputStream in) throws IOException {
+ proto = PipelineAckProto.parseFrom(vintPrefixed(in));
}
- @Override // Writable
- public void write(DataOutput out) throws IOException {
- //WritableUtils.writeVLong(out, seqno);
- out.writeLong(seqno);
- out.writeShort((short)replies.length);
- for(Status reply : replies) {
- reply.write(out);
- }
+ public void write(OutputStream out) throws IOException {
+ proto.writeDelimitedTo(out);
}
@Override //Object
public String toString() {
- StringBuilder ack = new StringBuilder("Replies for seqno ");
- ack.append( seqno ).append( " are" );
- for(Status reply : replies) {
- ack.append(" ");
- ack.append(reply);
- }
- return ack.toString();
+ return proto.toString();
}
}
/**
* Header data for each packet that goes through the read/write pipelines.
*/
- public static class PacketHeader implements Writable {
+ public static class PacketHeader {
/** Header size for a packet */
- public static final int PKT_HEADER_LEN = ( 4 + /* Packet payload length */
- 8 + /* offset in block */
- 8 + /* seqno */
- 1 + /* isLastPacketInBlock */
- 4 /* data length */ );
+ private static final int PROTO_SIZE =
+ PacketHeaderProto.newBuilder()
+ .setOffsetInBlock(0)
+ .setSeqno(0)
+ .setLastPacketInBlock(false)
+ .setDataLen(0)
+ .build().getSerializedSize();
+ public static final int PKT_HEADER_LEN =
+ 6 + PROTO_SIZE;
private int packetLen;
- private long offsetInBlock;
- private long seqno;
- private boolean lastPacketInBlock;
- private int dataLen;
+ private PacketHeaderProto proto;
public PacketHeader() {
}
@@ -863,26 +520,28 @@ public interface DataTransferProtocol {
public PacketHeader(int packetLen, long offsetInBlock, long seqno,
boolean lastPacketInBlock, int dataLen) {
this.packetLen = packetLen;
- this.offsetInBlock = offsetInBlock;
- this.seqno = seqno;
- this.lastPacketInBlock = lastPacketInBlock;
- this.dataLen = dataLen;
+ proto = PacketHeaderProto.newBuilder()
+ .setOffsetInBlock(offsetInBlock)
+ .setSeqno(seqno)
+ .setLastPacketInBlock(lastPacketInBlock)
+ .setDataLen(dataLen)
+ .build();
}
public int getDataLen() {
- return dataLen;
+ return proto.getDataLen();
}
public boolean isLastPacketInBlock() {
- return lastPacketInBlock;
+ return proto.getLastPacketInBlock();
}
public long getSeqno() {
- return seqno;
+ return proto.getSeqno();
}
public long getOffsetInBlock() {
- return offsetInBlock;
+ return proto.getOffsetInBlock();
}
public int getPacketLen() {
@@ -891,57 +550,50 @@ public interface DataTransferProtocol {
@Override
public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append("PacketHeader(")
- .append("packetLen=").append(packetLen)
- .append(" offsetInBlock=").append(offsetInBlock)
- .append(" seqno=").append(seqno)
- .append(" lastPacketInBlock=").append(lastPacketInBlock)
- .append(" dataLen=").append(dataLen)
- .append(")");
- return sb.toString();
+ return "PacketHeader with packetLen=" + packetLen +
+ "Header data: " +
+ proto.toString();
}
-
- @Override
- public void readFields(DataInput in) throws IOException {
- // Note that it's important for packetLen to come first and not
- // change format -
- // this is used by BlockReceiver to read entire packets with
- // a single read call.
- packetLen = in.readInt();
- offsetInBlock = in.readLong();
- seqno = in.readLong();
- lastPacketInBlock = in.readBoolean();
- dataLen = in.readInt();
- }
-
+
public void readFields(ByteBuffer buf) throws IOException {
packetLen = buf.getInt();
- offsetInBlock = buf.getLong();
- seqno = buf.getLong();
- lastPacketInBlock = (buf.get() != 0);
- dataLen = buf.getInt();
+ short protoLen = buf.getShort();
+ byte[] data = new byte[protoLen];
+ buf.get(data);
+ proto = PacketHeaderProto.parseFrom(data);
}
-
- @Override
- public void write(DataOutput out) throws IOException {
- out.writeInt(packetLen);
- out.writeLong(offsetInBlock);
- out.writeLong(seqno);
- out.writeBoolean(lastPacketInBlock);
- out.writeInt(dataLen);
+
+ public void readFields(DataInputStream in) throws IOException {
+ this.packetLen = in.readInt();
+ short protoLen = in.readShort();
+ byte[] data = new byte[protoLen];
+ in.readFully(data);
+ proto = PacketHeaderProto.parseFrom(data);
}
+
/**
* Write the header into the buffer.
* This requires that PKT_HEADER_LEN bytes are available.
*/
- public void putInBuffer(ByteBuffer buf) {
- buf.putInt(packetLen)
- .putLong(offsetInBlock)
- .putLong(seqno)
- .put((byte)(lastPacketInBlock ? 1 : 0))
- .putInt(dataLen);
+ public void putInBuffer(final ByteBuffer buf) {
+ assert proto.getSerializedSize() == PROTO_SIZE
+ : "Expected " + (PROTO_SIZE) + " got: " + proto.getSerializedSize();
+ try {
+ buf.putInt(packetLen);
+ buf.putShort((short) proto.getSerializedSize());
+ proto.writeTo(new ByteBufferOutputStream(buf));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void write(DataOutputStream out) throws IOException {
+ assert proto.getSerializedSize() == PROTO_SIZE
+ : "Expected " + (PROTO_SIZE) + " got: " + proto.getSerializedSize();
+ out.writeInt(packetLen);
+ out.writeShort(proto.getSerializedSize());
+ proto.writeTo(out);
}
/**
@@ -951,11 +603,11 @@ public interface DataTransferProtocol {
*/
public boolean sanityCheck(long lastSeqNo) {
// We should only have a non-positive data length for the last packet
- if (dataLen <= 0 && lastPacketInBlock) return false;
+ if (proto.getDataLen() <= 0 && proto.getLastPacketInBlock()) return false;
// The last packet should not contain data
- if (lastPacketInBlock && dataLen != 0) return false;
+ if (proto.getLastPacketInBlock() && proto.getDataLen() != 0) return false;
// Seqnos should always increase by 1 with each packet received
- if (seqno != lastSeqNo + 1) return false;
+ if (proto.getSeqno() != lastSeqNo + 1) return false;
return true;
}
@@ -963,16 +615,12 @@ public interface DataTransferProtocol {
public boolean equals(Object o) {
if (!(o instanceof PacketHeader)) return false;
PacketHeader other = (PacketHeader)o;
- return (other.packetLen == packetLen &&
- other.offsetInBlock == offsetInBlock &&
- other.seqno == seqno &&
- other.lastPacketInBlock == lastPacketInBlock &&
- other.dataLen == dataLen);
+ return this.proto.equals(other.proto);
}
@Override
public int hashCode() {
- return (int)seqno;
+ return (int)proto.getSeqno();
}
}
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java?rev=1134492&r1=1134491&r2=1134492&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java Sat Jun 11 00:36:12 2011
@@ -159,6 +159,11 @@ public class DatanodeInfo extends Datano
public void setCapacity(long capacity) {
this.capacity = capacity;
}
+
+ /** Sets the used space for the datanode. */
+ public void setDfsUsed(long dfsUsed) {
+ this.dfsUsed = dfsUsed;
+ }
/** Sets raw free space. */
public void setRemaining(long remaining) {
Added: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/HdfsProtoUtil.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/HdfsProtoUtil.java?rev=1134492&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/HdfsProtoUtil.java (added)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/HdfsProtoUtil.java Sat Jun 11 00:36:12 2011
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.util.ExactSizeInputStream;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.token.Token;
+
+import com.google.common.collect.Lists;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.CodedInputStream;
+
+/**
+ * Utilities for converting to and from protocol buffers used in the
+ * HDFS wire protocol, as well as some generic utilities useful
+ * for dealing with protocol buffers.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public abstract class HdfsProtoUtil {
+
+ //// Block Token ////
+
+ public static HdfsProtos.BlockTokenIdentifierProto toProto(Token<?> blockToken) {
+ return HdfsProtos.BlockTokenIdentifierProto.newBuilder()
+ .setIdentifier(ByteString.copyFrom(blockToken.getIdentifier()))
+ .setPassword(ByteString.copyFrom(blockToken.getPassword()))
+ .setKind(blockToken.getKind().toString())
+ .setService(blockToken.getService().toString())
+ .build();
+ }
+
+ public static Token<BlockTokenIdentifier> fromProto(HdfsProtos.BlockTokenIdentifierProto proto) {
+ return new Token<BlockTokenIdentifier>(proto.getIdentifier().toByteArray(),
+ proto.getPassword().toByteArray(),
+ new Text(proto.getKind()),
+ new Text(proto.getService()));
+ }
+
+ //// Extended Block ////
+
+ public static HdfsProtos.ExtendedBlockProto toProto(ExtendedBlock block) {
+ return HdfsProtos.ExtendedBlockProto.newBuilder()
+ .setBlockId(block.getBlockId())
+ .setPoolId(block.getBlockPoolId())
+ .setNumBytes(block.getNumBytes())
+ .setGenerationStamp(block.getGenerationStamp())
+ .build();
+ }
+
+ public static ExtendedBlock fromProto(HdfsProtos.ExtendedBlockProto proto) {
+ return new ExtendedBlock(
+ proto.getPoolId(), proto.getBlockId(),
+ proto.getNumBytes(), proto.getGenerationStamp());
+ }
+
+ //// DatanodeID ////
+
+ private static HdfsProtos.DatanodeIDProto toProto(
+ DatanodeID dni) {
+ return HdfsProtos.DatanodeIDProto.newBuilder()
+ .setName(dni.getName())
+ .setStorageID(dni.getStorageID())
+ .setInfoPort(dni.getInfoPort())
+ .build();
+ }
+
+ private static DatanodeID fromProto(HdfsProtos.DatanodeIDProto idProto) {
+ return new DatanodeID(
+ idProto.getName(),
+ idProto.getStorageID(),
+ idProto.getInfoPort(),
+ -1); // ipc port not serialized in writables either
+ }
+
+ //// DatanodeInfo ////
+
+ public static HdfsProtos.DatanodeInfoProto toProto(DatanodeInfo dni) {
+ return HdfsProtos.DatanodeInfoProto.newBuilder()
+ .setId(toProto((DatanodeID)dni))
+ .setCapacity(dni.getCapacity())
+ .setDfsUsed(dni.getDfsUsed())
+ .setRemaining(dni.getRemaining())
+ .setBlockPoolUsed(dni.getBlockPoolUsed())
+ .setLastUpdate(dni.getLastUpdate())
+ .setXceiverCount(dni.getXceiverCount())
+ .setLocation(dni.getNetworkLocation())
+ .setHostName(dni.getHostName())
+ .setAdminState(HdfsProtos.DatanodeInfoProto.AdminState.valueOf(
+ dni.getAdminState().name()))
+ .build();
+ }
+
+ public static DatanodeInfo fromProto(HdfsProtos.DatanodeInfoProto dniProto) {
+ DatanodeInfo dniObj = new DatanodeInfo(fromProto(dniProto.getId()),
+ dniProto.getLocation(), dniProto.getHostName());
+
+ dniObj.setCapacity(dniProto.getCapacity());
+ dniObj.setDfsUsed(dniProto.getDfsUsed());
+ dniObj.setRemaining(dniProto.getRemaining());
+ dniObj.setBlockPoolUsed(dniProto.getBlockPoolUsed());
+ dniObj.setLastUpdate(dniProto.getLastUpdate());
+ dniObj.setXceiverCount(dniProto.getXceiverCount());
+ dniObj.setAdminState(DatanodeInfo.AdminStates.valueOf(
+ dniProto.getAdminState().name()));
+ return dniObj;
+ }
+
+ public static ArrayList<? extends HdfsProtos.DatanodeInfoProto> toProtos(
+ DatanodeInfo[] dnInfos, int startIdx) {
+ ArrayList<HdfsProtos.DatanodeInfoProto> protos =
+ Lists.newArrayListWithCapacity(dnInfos.length);
+ for (int i = startIdx; i < dnInfos.length; i++) {
+ protos.add(toProto(dnInfos[i]));
+ }
+ return protos;
+ }
+
+ public static DatanodeInfo[] fromProtos(
+ List<HdfsProtos.DatanodeInfoProto> targetsList) {
+ DatanodeInfo[] ret = new DatanodeInfo[targetsList.size()];
+ int i = 0;
+ for (HdfsProtos.DatanodeInfoProto proto : targetsList) {
+ ret[i++] = fromProto(proto);
+ }
+ return ret;
+ }
+
+ public static InputStream vintPrefixed(final InputStream input)
+ throws IOException {
+ final int firstByte = input.read();
+ if (firstByte == -1) {
+ throw new EOFException("Premature EOF: no length prefix available");
+ }
+
+ int size = CodedInputStream.readRawVarint32(firstByte, input);
+ assert size >= 0;
+
+ return new ExactSizeInputStream(input, size);
+ }
+}
\ No newline at end of file