You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2011/06/11 02:36:13 UTC
svn commit: r1134492 [4/4] - in /hadoop/hdfs/trunk: ./
src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/
src/java/org/apache/hadoop/hdfs/protocol/proto/
src/java/org/apache/hadoop/hdfs/server/balancer/ src/java/org/apache/hadoo...
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=1134492&r1=1134491&r2=1134492&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java Sat Jun 11 00:36:12 2011
@@ -17,8 +17,6 @@
*/
package org.apache.hadoop.hdfs.server.balancer;
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.ERROR_ACCESS_TOKEN;
-
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
@@ -58,6 +56,9 @@ import org.apache.hadoop.hdfs.protocol.D
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.common.HdfsConstants;
import org.apache.hadoop.hdfs.server.common.Util;
@@ -73,6 +74,8 @@ import org.apache.hadoop.util.StringUtil
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
+import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.vintPrefixed;
+
/** <p>The balancer is a tool that balances disk space usage on an HDFS cluster
* when some datanodes become full or when new empty nodes join the cluster.
* The tool is deployed as an application program that can be run by the
@@ -352,9 +355,10 @@ public class Balancer {
/* Receive a block copy response from the input stream */
private void receiveResponse(DataInputStream in) throws IOException {
- DataTransferProtocol.Status status = DataTransferProtocol.Status.read(in);
- if (status != DataTransferProtocol.Status.SUCCESS) {
- if (status == ERROR_ACCESS_TOKEN)
+ BlockOpResponseProto response = BlockOpResponseProto.parseFrom(
+ vintPrefixed(in));
+ if (response.getStatus() != Status.SUCCESS) {
+ if (response.getStatus() == Status.ERROR_ACCESS_TOKEN)
throw new IOException("block move failed due to access token error");
throw new IOException("block move is failed");
}
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1134492&r1=1134491&r2=1134492&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Sat Jun 11 00:36:12 2011
@@ -17,8 +17,6 @@
*/
package org.apache.hadoop.hdfs.server.datanode;
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.ERROR;
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.SUCCESS;
import static org.apache.hadoop.hdfs.server.datanode.DataNode.DN_CLIENTTRACE_FORMAT;
import java.io.BufferedOutputStream;
@@ -39,11 +37,11 @@ import org.apache.hadoop.fs.FSOutputSumm
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PacketHeader;
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PipelineAck;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
@@ -996,13 +994,13 @@ class BlockReceiver implements Closeable
Status[] replies = null;
if (mirrorError) { // ack read error
replies = new Status[2];
- replies[0] = SUCCESS;
- replies[1] = ERROR;
+ replies[0] = Status.SUCCESS;
+ replies[1] = Status.ERROR;
} else {
short ackLen = type == PacketResponderType.LAST_IN_PIPELINE? 0
: ack.getNumOfReplies();
replies = new Status[1+ackLen];
- replies[0] = SUCCESS;
+ replies[0] = Status.SUCCESS;
for (int i=0; i<ackLen; i++) {
replies[i+1] = ack.getReply(i);
}
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1134492&r1=1134491&r2=1134492&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Sat Jun 11 00:36:12 2011
@@ -19,8 +19,6 @@ package org.apache.hadoop.hdfs.server.da
import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.ERROR_ACCESS_TOKEN;
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.SUCCESS;
import static org.apache.hadoop.hdfs.server.common.Util.now;
import java.io.BufferedOutputStream;
@@ -75,8 +73,11 @@ import org.apache.hadoop.hdfs.protocol.D
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DNTransferAckProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
@@ -133,6 +134,7 @@ import org.apache.hadoop.util.StringUtil
import org.apache.hadoop.util.VersionInfo;
import org.mortbay.util.ajax.JSON;
+
/**********************************************************
* DataNode is a class (and program) that stores a set of
* blocks for a DFS deployment. A single deployment can
@@ -1958,12 +1960,13 @@ public class DataNode extends Configured
// read ack
if (isClient) {
in = new DataInputStream(NetUtils.getInputStream(sock));
- final DataTransferProtocol.Status s = DataTransferProtocol.Status.read(in);
+ DNTransferAckProto closeAck = DNTransferAckProto.parseFrom(
+ HdfsProtoUtil.vintPrefixed(in));
if (LOG.isDebugEnabled()) {
- LOG.debug(getClass().getSimpleName() + ": close-ack=" + s);
+ LOG.debug(getClass().getSimpleName() + ": close-ack=" + closeAck);
}
- if (s != SUCCESS) {
- if (s == ERROR_ACCESS_TOKEN) {
+ if (closeAck.getStatus() != Status.SUCCESS) {
+ if (closeAck.getStatus() == Status.ERROR_ACCESS_TOKEN) {
throw new InvalidBlockTokenException(
"Got access token error for connect ack, targets="
+ Arrays.asList(targets));
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1134492&r1=1134491&r2=1134492&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Sat Jun 11 00:36:12 2011
@@ -17,9 +17,8 @@
*/
package org.apache.hadoop.hdfs.server.datanode;
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.ERROR;
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.ERROR_ACCESS_TOKEN;
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.SUCCESS;
+import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.*;
+import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import static org.apache.hadoop.hdfs.server.common.Util.now;
import static org.apache.hadoop.hdfs.server.datanode.DataNode.DN_CLIENTTRACE_FORMAT;
@@ -44,6 +43,11 @@ import org.apache.hadoop.hdfs.protocol.D
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto;
+
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.server.common.HdfsConstants;
@@ -51,7 +55,6 @@ import org.apache.hadoop.hdfs.server.dat
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.MD5Hash;
-import org.apache.hadoop.io.Text;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.metrics2.lib.MutableRate;
import org.apache.hadoop.net.NetUtils;
@@ -60,6 +63,9 @@ import org.apache.hadoop.security.token.
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.StringUtils;
+import com.google.protobuf.ByteString;
+
+
/**
* Thread for processing incoming/outgoing data stream.
*/
@@ -224,16 +230,19 @@ class DataXceiver extends DataTransferPr
sendResponse(s, ERROR, datanode.socketWriteTimeout);
throw e;
}
+
+ // send op status
+ sendResponse(s, SUCCESS, datanode.socketWriteTimeout);
- SUCCESS.write(out); // send op status
long read = blockSender.sendBlock(out, baseStream, null); // send data
if (blockSender.didSendEntireByteRange()) {
// If we sent the entire range, then we should expect the client
// to respond with a Status enum.
try {
- DataTransferProtocol.Status stat = DataTransferProtocol.Status.read(in);
- if (stat == null) {
+ ClientReadStatusProto stat = ClientReadStatusProto.parseFrom(
+ HdfsProtoUtil.vintPrefixed(in));
+ if (!stat.hasStatus()) {
LOG.warn("Client " + s.getInetAddress() + " did not send a valid status " +
"code after reading. Will close connection.");
IOUtils.closeStream(out);
@@ -329,7 +338,7 @@ class DataXceiver extends DataTransferPr
BlockReceiver blockReceiver = null; // responsible for data handling
String mirrorNode = null; // the name:port of next target
String firstBadLink = ""; // first datanode that failed in connection setup
- DataTransferProtocol.Status mirrorInStatus = SUCCESS;
+ Status mirrorInStatus = SUCCESS;
try {
if (isDatanode ||
stage != BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
@@ -377,8 +386,10 @@ class DataXceiver extends DataTransferPr
// read connect ack (only for clients, not for replication req)
if (isClient) {
- mirrorInStatus = DataTransferProtocol.Status.read(mirrorIn);
- firstBadLink = Text.readString(mirrorIn);
+ BlockOpResponseProto connectAck =
+ BlockOpResponseProto.parseFrom(HdfsProtoUtil.vintPrefixed(mirrorIn));
+ mirrorInStatus = connectAck.getStatus();
+ firstBadLink = connectAck.getFirstBadLink();
if (LOG.isDebugEnabled() || mirrorInStatus != SUCCESS) {
LOG.info("Datanode " + targets.length +
" got response for connect ack " +
@@ -389,8 +400,11 @@ class DataXceiver extends DataTransferPr
} catch (IOException e) {
if (isClient) {
- ERROR.write(replyOut);
- Text.writeString(replyOut, mirrorNode);
+ BlockOpResponseProto.newBuilder()
+ .setStatus(ERROR)
+ .setFirstBadLink(mirrorNode)
+ .build()
+ .writeDelimitedTo(replyOut);
replyOut.flush();
}
IOUtils.closeStream(mirrorOut);
@@ -417,8 +431,11 @@ class DataXceiver extends DataTransferPr
" forwarding connect ack to upstream firstbadlink is " +
firstBadLink);
}
- mirrorInStatus.write(replyOut);
- Text.writeString(replyOut, firstBadLink);
+ BlockOpResponseProto.newBuilder()
+ .setStatus(mirrorInStatus)
+ .setFirstBadLink(firstBadLink)
+ .build()
+ .writeDelimitedTo(replyOut);
replyOut.flush();
}
@@ -433,7 +450,7 @@ class DataXceiver extends DataTransferPr
if (LOG.isTraceEnabled()) {
LOG.trace("TRANSFER: send close-ack");
}
- SUCCESS.write(replyOut);
+ writeResponse(SUCCESS, replyOut);
}
}
@@ -489,12 +506,12 @@ class DataXceiver extends DataTransferPr
NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
try {
datanode.transferReplicaForPipelineRecovery(blk, targets, client);
- SUCCESS.write(out);
+ writeResponse(Status.SUCCESS, out);
} finally {
IOUtils.closeStream(out);
}
}
-
+
/**
* Get block checksum (MD5 of CRC32).
*/
@@ -530,10 +547,15 @@ class DataXceiver extends DataTransferPr
}
//write reply
- SUCCESS.write(out);
- out.writeInt(bytesPerCRC);
- out.writeLong(crcPerBlock);
- md5.write(out);
+ BlockOpResponseProto.newBuilder()
+ .setStatus(SUCCESS)
+ .setChecksumResponse(OpBlockChecksumResponseProto.newBuilder()
+ .setBytesPerCrc(bytesPerCRC)
+ .setCrcPerBlock(crcPerBlock)
+ .setMd5(ByteString.copyFrom(md5.getDigest()))
+ )
+ .build()
+ .writeDelimitedTo(out);
out.flush();
} finally {
IOUtils.closeStream(out);
@@ -590,7 +612,7 @@ class DataXceiver extends DataTransferPr
baseStream, SMALL_BUFFER_SIZE));
// send status first
- SUCCESS.write(reply);
+ writeResponse(SUCCESS, reply);
// send block content to the target
long read = blockSender.sendBlock(reply, baseStream,
dataXceiverServer.balanceThrottler);
@@ -653,7 +675,7 @@ class DataXceiver extends DataTransferPr
Socket proxySock = null;
DataOutputStream proxyOut = null;
- DataTransferProtocol.Status opStatus = SUCCESS;
+ Status opStatus = SUCCESS;
BlockReceiver blockReceiver = null;
DataInputStream proxyReply = null;
@@ -676,10 +698,11 @@ class DataXceiver extends DataTransferPr
// receive the response from the proxy
proxyReply = new DataInputStream(new BufferedInputStream(
NetUtils.getInputStream(proxySock), BUFFER_SIZE));
- final DataTransferProtocol.Status status
- = DataTransferProtocol.Status.read(proxyReply);
- if (status != SUCCESS) {
- if (status == ERROR_ACCESS_TOKEN) {
+ BlockOpResponseProto copyResponse = BlockOpResponseProto.parseFrom(
+ HdfsProtoUtil.vintPrefixed(proxyReply));
+
+ if (copyResponse.getStatus() != SUCCESS) {
+ if (copyResponse.getStatus() == ERROR_ACCESS_TOKEN) {
throw new IOException("Copy block " + block + " from "
+ proxySock.getRemoteSocketAddress()
+ " failed due to access token error");
@@ -748,13 +771,24 @@ class DataXceiver extends DataTransferPr
* @param opStatus status message to write
* @param timeout send timeout
**/
- private void sendResponse(Socket s, DataTransferProtocol.Status opStatus,
+ private void sendResponse(Socket s, Status status,
long timeout) throws IOException {
DataOutputStream reply =
new DataOutputStream(NetUtils.getOutputStream(s, timeout));
- opStatus.write(reply);
- reply.flush();
+
+ writeResponse(status, reply);
+ }
+
+ private void writeResponse(Status status, OutputStream out)
+ throws IOException {
+ BlockOpResponseProto response = BlockOpResponseProto.newBuilder()
+ .setStatus(status)
+ .build();
+
+ response.writeDelimitedTo(out);
+ out.flush();
}
+
private void checkAccess(DataOutputStream out, final boolean reply,
final ExtendedBlock blk,
@@ -771,12 +805,15 @@ class DataXceiver extends DataTransferPr
out = new DataOutputStream(
NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
}
- ERROR_ACCESS_TOKEN.write(out);
+
+ BlockOpResponseProto.Builder resp = BlockOpResponseProto.newBuilder()
+ .setStatus(ERROR_ACCESS_TOKEN);
if (mode == BlockTokenSecretManager.AccessMode.WRITE) {
DatanodeRegistration dnR =
datanode.getDNRegistrationForBP(blk.getBlockPoolId());
- Text.writeString(out, dnR.getName());
+ resp.setFirstBadLink(dnR.getName());
}
+ resp.build().writeDelimitedTo(out);
out.flush();
}
LOG.warn("Block token verification failed: op=" + op
Added: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/util/ByteBufferOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/util/ByteBufferOutputStream.java?rev=1134492&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/util/ByteBufferOutputStream.java (added)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/util/ByteBufferOutputStream.java Sat Jun 11 00:36:12 2011
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.util;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * OutputStream that writes into a {@link ByteBuffer}.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Stable
+public class ByteBufferOutputStream extends OutputStream {
+
+ private final ByteBuffer buf;
+
+ public ByteBufferOutputStream(ByteBuffer buf) {
+ this.buf = buf;
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ buf.put((byte)b);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ buf.put(b, off, len);
+ }
+}
\ No newline at end of file
Added: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/util/ExactSizeInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/util/ExactSizeInputStream.java?rev=1134492&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/util/ExactSizeInputStream.java (added)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/util/ExactSizeInputStream.java Sat Jun 11 00:36:12 2011
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.util;
+
+import java.io.EOFException;
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * An InputStream implementations which reads from some other InputStream
+ * but expects an exact number of bytes. Any attempts to read past the
+ * specified number of bytes will return as if the end of the stream
+ * was reached. If the end of the underlying stream is reached prior to
+ * the specified number of bytes, an EOFException is thrown.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class ExactSizeInputStream extends FilterInputStream {
+ private int remaining;
+
+ /**
+ * Construct an input stream that will read no more than
+ * 'numBytes' bytes.
+ *
+ * If an EOF occurs on the underlying stream before numBytes
+ * bytes have been read, an EOFException will be thrown.
+ *
+ * @param in the inputstream to wrap
+ * @param numBytes the number of bytes to read
+ */
+ public ExactSizeInputStream(InputStream in, int numBytes) {
+ super(in);
+ Preconditions.checkArgument(numBytes >= 0,
+ "Negative expected bytes: ", numBytes);
+ this.remaining = numBytes;
+ }
+
+ @Override
+ public int available() throws IOException {
+ return Math.min(super.available(), remaining);
+ }
+
+ @Override
+ public int read() throws IOException {
+ // EOF if we reached our limit
+ if (remaining <= 0) {
+ return -1;
+ }
+ final int result = super.read();
+ if (result >= 0) {
+ --remaining;
+ } else if (remaining > 0) {
+ // Underlying stream reached EOF but we haven't read the expected
+ // number of bytes.
+ throw new EOFException(
+ "Premature EOF. Expected " + remaining + "more bytes");
+ }
+ return result;
+ }
+
+ @Override
+ public int read(final byte[] b, final int off, int len)
+ throws IOException {
+ if (remaining <= 0) {
+ return -1;
+ }
+ len = Math.min(len, remaining);
+ final int result = super.read(b, off, len);
+ if (result >= 0) {
+ remaining -= result;
+ } else if (remaining > 0) {
+ // Underlying stream reached EOF but we haven't read the expected
+ // number of bytes.
+ throw new EOFException(
+ "Premature EOF. Expected " + remaining + "more bytes");
+ }
+ return result;
+ }
+
+ @Override
+ public long skip(final long n) throws IOException {
+ final long result = super.skip(Math.min(n, remaining));
+ if (result > 0) {
+ remaining -= result;
+ } else if (remaining > 0) {
+ // Underlying stream reached EOF but we haven't read the expected
+ // number of bytes.
+ throw new EOFException(
+ "Premature EOF. Expected " + remaining + "more bytes");
+ }
+ return result;
+ }
+
+ @Override
+ public boolean markSupported() {
+ return false;
+ }
+
+ @Override
+ public void mark(int readlimit) {
+ throw new UnsupportedOperationException();
+ }
+
+}
\ No newline at end of file
Added: hadoop/hdfs/trunk/src/proto/datatransfer.proto
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/proto/datatransfer.proto?rev=1134492&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/proto/datatransfer.proto (added)
+++ hadoop/hdfs/trunk/src/proto/datatransfer.proto Sat Jun 11 00:36:12 2011
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// This file contains protocol buffers that are used to transfer data
+// to and from the datanode, as well as between datanodes.
+
+option java_package = "org.apache.hadoop.hdfs.protocol.proto";
+option java_outer_classname = "DataTransferProtos";
+option java_generate_equals_and_hash = true;
+
+import "hdfs.proto";
+
+message BaseHeaderProto {
+ required ExtendedBlockProto block = 1;
+ optional BlockTokenIdentifierProto token = 2;
+}
+
+message ClientOperationHeaderProto {
+ required BaseHeaderProto baseHeader = 1;
+ required string clientName = 2;
+}
+
+message OpReadBlockProto {
+ required ClientOperationHeaderProto header = 1;
+ required uint64 offset = 2;
+ required uint64 len = 3;
+}
+
+message OpWriteBlockProto {
+ required ClientOperationHeaderProto header = 1;
+ repeated DatanodeInfoProto targets = 2;
+ optional DatanodeInfoProto source = 3;
+ enum BlockConstructionStage {
+ PIPELINE_SETUP_APPEND = 0;
+ // pipeline set up for failed PIPELINE_SETUP_APPEND recovery
+ PIPELINE_SETUP_APPEND_RECOVERY = 1;
+ // data streaming
+ DATA_STREAMING = 2;
+ // pipeline setup for failed data streaming recovery
+ PIPELINE_SETUP_STREAMING_RECOVERY = 3;
+ // close the block and pipeline
+ PIPELINE_CLOSE = 4;
+ // Recover a failed PIPELINE_CLOSE
+ PIPELINE_CLOSE_RECOVERY = 5;
+ // pipeline set up for block creation
+ PIPELINE_SETUP_CREATE = 6;
+ // transfer RBW for adding datanodes
+ TRANSFER_RBW = 7;
+ // transfer Finalized for adding datanodes
+ TRANSFER_FINALIZED = 8;
+ }
+ required BlockConstructionStage stage = 4;
+ required uint32 pipelineSize = 5;
+ required uint64 minBytesRcvd = 6;
+ required uint64 maxBytesRcvd = 7;
+ required uint64 latestGenerationStamp = 8;
+}
+
+message OpTransferBlockProto {
+ required ClientOperationHeaderProto header = 1;
+ repeated DatanodeInfoProto targets = 2;
+}
+
+message OpReplaceBlockProto {
+ required BaseHeaderProto header = 1;
+ required string delHint = 2;
+ required DatanodeInfoProto source = 3;
+}
+
+message OpCopyBlockProto {
+ required BaseHeaderProto header = 1;
+}
+
+message OpBlockChecksumProto {
+ required BaseHeaderProto header = 1;
+}
+
+
+message PacketHeaderProto {
+ // All fields must be fixed-length!
+ required sfixed64 offsetInBlock = 1;
+ required sfixed64 seqno = 2;
+ required bool lastPacketInBlock = 3;
+ required sfixed32 dataLen = 4;
+}
+
+enum Status {
+ SUCCESS = 0;
+ ERROR = 1;
+ ERROR_CHECKSUM = 2;
+ ERROR_INVALID = 3;
+ ERROR_EXISTS = 4;
+ ERROR_ACCESS_TOKEN = 5;
+ CHECKSUM_OK = 6;
+}
+
+message PipelineAckProto {
+ required sint64 seqno = 1;
+ repeated Status status = 2;
+}
+
+message BlockOpResponseProto {
+ required Status status = 1;
+
+ optional string firstBadLink = 2;
+ optional OpBlockChecksumResponseProto checksumResponse = 3;
+}
+
+/**
+ * Message sent from the client to the DN after reading the entire
+ * read request.
+ */
+message ClientReadStatusProto {
+ required Status status = 1;
+}
+
+message DNTransferAckProto {
+ required Status status = 1;
+}
+
+message OpBlockChecksumResponseProto {
+ required uint32 bytesPerCrc = 1;
+ required uint64 crcPerBlock = 2;
+ required bytes md5 = 3;
+}
Added: hadoop/hdfs/trunk/src/proto/hdfs.proto
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/proto/hdfs.proto?rev=1134492&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/proto/hdfs.proto (added)
+++ hadoop/hdfs/trunk/src/proto/hdfs.proto Sat Jun 11 00:36:12 2011
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// This file contains protocol buffers that are used throughout HDFS -- i.e.
+// by the client, server, and data transfer protocols.
+
+option java_package = "org.apache.hadoop.hdfs.protocol.proto";
+option java_outer_classname = "HdfsProtos";
+option java_generate_equals_and_hash = true;
+
+message ExtendedBlockProto {
+ required string poolId = 1;
+ required uint64 blockId = 2;
+ required uint64 numBytes = 3;
+ required uint64 generationStamp = 4;
+}
+
+message BlockTokenIdentifierProto {
+ required bytes identifier = 1;
+ required bytes password = 2;
+ required string kind = 3;
+ required string service = 4;
+}
+
+message DatanodeIDProto {
+ required string name = 1;
+ required string storageID = 2;
+ required uint32 infoPort = 3;
+}
+
+message DatanodeInfoProto {
+ required DatanodeIDProto id = 1;
+ optional uint64 capacity = 2;
+ optional uint64 dfsUsed = 3;
+ optional uint64 remaining = 4;
+ optional uint64 blockPoolUsed = 5;
+ optional uint64 lastUpdate = 6;
+ optional uint32 xceiverCount = 7;
+ optional string location = 8;
+ optional string hostName = 9;
+ enum AdminState {
+ NORMAL = 0;
+ DECOMMISSION_INPROGRESS = 1;
+ DECOMMISSIONED = 2;
+ }
+
+ optional AdminState adminState = 10;
+}
+
Modified: hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj?rev=1134492&r1=1134491&r2=1134492&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj (original)
+++ hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj Sat Jun 11 00:36:12 2011
@@ -17,9 +17,8 @@
*/
package org.apache.hadoop.hdfs.server.datanode;
-import java.io.DataInput;
-import java.io.DataOutput;
import java.io.IOException;
+import java.io.InputStream;
import java.io.OutputStream;
import org.apache.commons.logging.Log;
@@ -33,7 +32,6 @@ import org.apache.hadoop.hdfs.server.dat
import org.apache.hadoop.hdfs.PipelinesTestUtil.PipelinesTest;
import org.apache.hadoop.hdfs.PipelinesTestUtil.NodeBytes;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status;
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PipelineAck;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
@@ -83,7 +81,7 @@ privileged public aspect BlockReceiverAs
}
pointcut afterDownstreamStatusRead(BlockReceiver.PacketResponder responder):
- call(void PipelineAck.readFields(DataInput)) && this(responder);
+ call(void PipelineAck.readFields(InputStream)) && this(responder);
after(BlockReceiver.PacketResponder responder)
throws IOException: afterDownstreamStatusRead(responder) {
@@ -150,7 +148,7 @@ privileged public aspect BlockReceiverAs
}
pointcut preventAckSending () :
- call (void PipelineAck.write(DataOutput))
+ call (void PipelineAck.write(OutputStream))
&& within (PacketResponder);
static int ackCounter = 0;
@@ -203,7 +201,7 @@ privileged public aspect BlockReceiverAs
}
pointcut pipelineAck(BlockReceiver.PacketResponder packetresponder) :
- call (void PipelineAck.readFields(DataInput))
+ call (void PipelineAck.readFields(InputStream))
&& this(packetresponder);
after(BlockReceiver.PacketResponder packetresponder) throws IOException
Modified: hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/DataTransferProtocolAspects.aj
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/DataTransferProtocolAspects.aj?rev=1134492&r1=1134491&r2=1134492&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/DataTransferProtocolAspects.aj (original)
+++ hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/DataTransferProtocolAspects.aj Sat Jun 11 00:36:12 2011
@@ -19,15 +19,16 @@ package org.apache.hadoop.hdfs.server.da
import java.io.DataInput;
import java.io.DataInputStream;
+import java.io.InputStream;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fi.DataTransferTestUtil;
import org.apache.hadoop.fi.DataTransferTestUtil.DataTransferTest;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Op;
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Receiver;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status;
/** Aspect for DataTransferProtocol */
public aspect DataTransferProtocolAspects {
@@ -53,9 +54,9 @@ public aspect DataTransferProtocolAspect
}
pointcut statusRead(DataXceiver dataxceiver):
- call(Status Status.read(DataInput)) && this(dataxceiver);
+ call(BlockOpResponseProto BlockOpResponseProto.parseFrom(InputStream)) && this(dataxceiver);
- after(DataXceiver dataxceiver) returning(Status status
+ after(DataXceiver dataxceiver) returning(BlockOpResponseProto status
) throws IOException: statusRead(dataxceiver) {
final DataNode d = dataxceiver.getDataNode();
LOG.info("FI: statusRead " + status + ", datanode="
Modified: hadoop/hdfs/trunk/src/test/findbugsExcludeFile.xml
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/findbugsExcludeFile.xml?rev=1134492&r1=1134491&r2=1134492&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/findbugsExcludeFile.xml (original)
+++ hadoop/hdfs/trunk/src/test/findbugsExcludeFile.xml Sat Jun 11 00:36:12 2011
@@ -3,6 +3,9 @@
<Package name="org.apache.hadoop.record.compiler.generated" />
</Match>
<Match>
+ <Package name="org.apache.hadoop.hdfs.protocol.proto" />
+ </Match>
+ <Match>
<Bug pattern="EI_EXPOSE_REP" />
</Match>
<Match>
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java?rev=1134492&r1=1134491&r2=1134492&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java Sat Jun 11 00:36:12 2011
@@ -57,6 +57,7 @@ import org.apache.hadoop.hdfs.protocol.D
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
@@ -66,7 +67,6 @@ import org.apache.hadoop.hdfs.server.nam
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.ShellBasedUnixGroupsMapping;
import org.apache.hadoop.security.UserGroupInformation;
@@ -662,7 +662,7 @@ public class DFSTestUtil {
}
/** For {@link TestTransferRbw} */
- public static DataTransferProtocol.Status transferRbw(final ExtendedBlock b,
+ public static BlockOpResponseProto transferRbw(final ExtendedBlock b,
final DFSClient dfsClient, final DatanodeInfo... datanodes) throws IOException {
assertEquals(2, datanodes.length);
final Socket s = DFSOutputStream.createSocketForPipeline(datanodes[0],
@@ -678,6 +678,6 @@ public class DFSTestUtil {
new DatanodeInfo[]{datanodes[1]}, new Token<BlockTokenIdentifier>());
out.flush();
- return DataTransferProtocol.Status.read(in);
+ return BlockOpResponseProto.parseDelimitedFrom(in);
}
}
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java?rev=1134492&r1=1134491&r2=1134492&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java Sat Jun 11 00:36:12 2011
@@ -20,8 +20,8 @@ package org.apache.hadoop.hdfs;
import java.util.List;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.fs.Path;
import org.junit.Test;
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java?rev=1134492&r1=1134491&r2=1134492&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java Sat Jun 11 00:36:12 2011
@@ -20,9 +20,6 @@ package org.apache.hadoop.hdfs;
import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Op.WRITE_BLOCK;
import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PacketHeader;
import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PipelineAck;
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status;
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.ERROR;
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.SUCCESS;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@@ -46,11 +43,15 @@ import org.apache.hadoop.fs.FSDataOutput
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto.Builder;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.server.common.HdfsConstants;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
@@ -59,6 +60,7 @@ import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.DataChecksum;
+import org.apache.hadoop.util.StringUtils;
import org.junit.Test;
/**
@@ -93,6 +95,9 @@ public class TestDataTransferProtocol ex
if ( testDescription != null ) {
LOG.info("Testing : " + testDescription);
}
+ LOG.info("Going to write:" +
+ StringUtils.byteToHexString(sendBuf.toByteArray()));
+
sock = new Socket();
sock.connect(dnAddr, HdfsConstants.READ_TIMEOUT);
sock.setSoTimeout(HdfsConstants.READ_TIMEOUT);
@@ -113,10 +118,11 @@ public class TestDataTransferProtocol ex
}
throw eof;
}
- for (int i=0; i<retBuf.length; i++) {
- System.out.print(retBuf[i]);
- }
- System.out.println(":");
+
+ LOG.info("Received: " +
+ StringUtils.byteToHexString(retBuf));
+ LOG.info("Expected: " +
+ StringUtils.byteToHexString(recvBuf.toByteArray()));
if (eofExpected) {
throw new IOException("Did not recieve IOException when an exception " +
@@ -162,12 +168,22 @@ public class TestDataTransferProtocol ex
sendOut.writeInt(0); // zero checksum
//ok finally write a block with 0 len
- SUCCESS.write(recvOut);
- Text.writeString(recvOut, "");
- new PipelineAck(100, new Status[]{SUCCESS}).write(recvOut);
+ sendResponse(Status.SUCCESS, "", recvOut);
+ new PipelineAck(100, new Status[]{Status.SUCCESS}).write(recvOut);
sendRecvData(description, false);
}
+ private void sendResponse(Status status, String firstBadLink,
+ DataOutputStream out)
+ throws IOException {
+ Builder builder = BlockOpResponseProto.newBuilder().setStatus(status);
+ if (firstBadLink != null) {
+ builder.setFirstBadLink(firstBadLink);
+ }
+ builder.build()
+ .writeDelimitedTo(out);
+ }
+
private void testWrite(ExtendedBlock block, BlockConstructionStage stage, long newGS,
String description, Boolean eofExcepted) throws IOException {
sendBuf.reset();
@@ -176,12 +192,11 @@ public class TestDataTransferProtocol ex
stage, newGS, block.getNumBytes(), block.getNumBytes(), "cl", null,
new DatanodeInfo[1], BlockTokenSecretManager.DUMMY_TOKEN);
if (eofExcepted) {
- ERROR.write(recvOut);
+ sendResponse(Status.ERROR, null, recvOut);
sendRecvData(description, true);
} else if (stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
//ok finally write a block with 0 len
- SUCCESS.write(recvOut);
- Text.writeString(recvOut, ""); // first bad node
+ sendResponse(Status.SUCCESS, "", recvOut);
sendRecvData(description, false);
} else {
writeZeroLengthPacket(block, description);
@@ -369,7 +384,7 @@ public class TestDataTransferProtocol ex
// bad bytes per checksum
sendOut.writeInt(-1-random.nextInt(oneMil));
recvBuf.reset();
- ERROR.write(recvOut);
+ sendResponse(Status.ERROR, null, recvOut);
sendRecvData("wrong bytesPerChecksum while writing", true);
sendBuf.reset();
@@ -389,9 +404,8 @@ public class TestDataTransferProtocol ex
-1 - random.nextInt(oneMil)); // bad datalen
hdr.write(sendOut);
- SUCCESS.write(recvOut);
- Text.writeString(recvOut, "");
- new PipelineAck(100, new Status[]{ERROR}).write(recvOut);
+ sendResponse(Status.SUCCESS, "", recvOut);
+ new PipelineAck(100, new Status[]{Status.ERROR}).write(recvOut);
sendRecvData("negative DATA_CHUNK len while writing block " + newBlockId,
true);
@@ -415,9 +429,8 @@ public class TestDataTransferProtocol ex
sendOut.writeInt(0); // zero checksum
sendOut.flush();
//ok finally write a block with 0 len
- SUCCESS.write(recvOut);
- Text.writeString(recvOut, "");
- new PipelineAck(100, new Status[]{SUCCESS}).write(recvOut);
+ sendResponse(Status.SUCCESS, "", recvOut);
+ new PipelineAck(100, new Status[]{Status.SUCCESS}).write(recvOut);
sendRecvData("Writing a zero len block blockid " + newBlockId, false);
/* Test OP_READ_BLOCK */
@@ -450,7 +463,7 @@ public class TestDataTransferProtocol ex
// negative length is ok. Datanode assumes we want to read the whole block.
recvBuf.reset();
- SUCCESS.write(recvOut);
+ sendResponse(Status.SUCCESS, null, recvOut);
sendBuf.reset();
DataTransferProtocol.Sender.opReadBlock(sendOut, blk, 0L,
-1 - random.nextInt(oneMil), "cl", BlockTokenSecretManager.DUMMY_TOKEN);
@@ -459,7 +472,7 @@ public class TestDataTransferProtocol ex
// length is more than size of block.
recvBuf.reset();
- ERROR.write(recvOut);
+ sendResponse(Status.ERROR, null, recvOut);
sendBuf.reset();
DataTransferProtocol.Sender.opReadBlock(sendOut, blk, 0L,
fileLen + 1, "cl", BlockTokenSecretManager.DUMMY_TOKEN);
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java?rev=1134492&r1=1134491&r2=1134492&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java Sat Jun 11 00:36:12 2011
@@ -17,8 +17,6 @@
*/
package org.apache.hadoop.hdfs.server.datanode;
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.*;
-
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
@@ -46,6 +44,8 @@ import org.apache.hadoop.hdfs.protocol.D
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.server.common.HdfsConstants;
import org.apache.hadoop.hdfs.server.common.Util;
@@ -264,7 +264,9 @@ public class TestBlockReplacement extend
// receiveResponse
DataInputStream reply = new DataInputStream(sock.getInputStream());
- return DataTransferProtocol.Status.read(reply) == SUCCESS;
+ BlockOpResponseProto proto =
+ BlockOpResponseProto.parseDelimitedFrom(reply);
+ return proto.getStatus() == Status.SUCCESS;
}
/**
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestTransferRbw.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestTransferRbw.java?rev=1134492&r1=1134491&r2=1134492&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestTransferRbw.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestTransferRbw.java Sat Jun 11 00:36:12 2011
@@ -33,6 +33,8 @@ import org.apache.hadoop.hdfs.protocol.D
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.log4j.Level;
@@ -121,9 +123,9 @@ public class TestTransferRbw {
//transfer RBW
final ExtendedBlock b = new ExtendedBlock(bpid, oldrbw.getBlockId(), oldrbw.getBytesAcked(),
oldrbw.getGenerationStamp());
- final DataTransferProtocol.Status s = DFSTestUtil.transferRbw(
+ final BlockOpResponseProto s = DFSTestUtil.transferRbw(
b, fs.getClient(), oldnodeinfo, newnodeinfo);
- Assert.assertEquals(DataTransferProtocol.Status.SUCCESS, s);
+ Assert.assertEquals(Status.SUCCESS, s.getStatus());
}
//check new rbw
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlockTokenWithDFS.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlockTokenWithDFS.java?rev=1134492&r1=1134491&r2=1134492&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlockTokenWithDFS.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlockTokenWithDFS.java Sat Jun 11 00:36:12 2011
@@ -43,6 +43,7 @@ import org.apache.hadoop.fs.FSDataOutput
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.token.*;
+import org.apache.hadoop.util.StringUtils;
import org.apache.log4j.Level;
import junit.framework.TestCase;
@@ -142,7 +143,8 @@ public class TestBlockTokenWithDFS exten
+ "when it is expected to be valid", shouldSucceed);
return;
}
- fail("OP_READ_BLOCK failed due to reasons other than access token");
+ fail("OP_READ_BLOCK failed due to reasons other than access token: "
+ + StringUtils.stringifyException(ex));
} finally {
if (s != null) {
try {
Added: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/util/TestExactSizeInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/util/TestExactSizeInputStream.java?rev=1134492&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/util/TestExactSizeInputStream.java (added)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/util/TestExactSizeInputStream.java Sat Jun 11 00:36:12 2011
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.util;
+
+import static org.junit.Assert.*;
+
+import java.io.ByteArrayInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.hdfs.util.ExactSizeInputStream;
+import org.junit.Test;
+
+public class TestExactSizeInputStream {
+ @Test
+ public void testBasicsReadSingle() throws IOException {
+ ExactSizeInputStream s = new ExactSizeInputStream(byteStream("hello"), 3);
+ assertEquals(3, s.available());
+
+ assertEquals((int)'h', s.read());
+ assertEquals((int)'e', s.read());
+ assertEquals((int)'l', s.read());
+ assertEquals(-1, s.read());
+ assertEquals(0, s.available());
+ }
+
+ @Test
+ public void testBasicsReadArray() throws IOException {
+ ExactSizeInputStream s = new ExactSizeInputStream(byteStream("hello"), 3);
+ assertEquals(3, s.available());
+
+ byte[] buf = new byte[10];
+
+ assertEquals(2, s.read(buf, 0, 2));
+ assertEquals('h', buf[0]);
+ assertEquals('e', buf[1]);
+
+ assertEquals(1, s.read(buf, 0, 2));
+ assertEquals('l', buf[0]);
+
+ assertEquals(-1, s.read(buf, 0, 2));
+ }
+
+ @Test
+ public void testBasicsSkip() throws IOException {
+ ExactSizeInputStream s = new ExactSizeInputStream(byteStream("hello"), 3);
+ assertEquals(3, s.available());
+
+ assertEquals(2, s.skip(2));
+ assertEquals(1, s.skip(2));
+ assertEquals(0, s.skip(2));
+ }
+
+ @Test
+ public void testReadNotEnough() throws IOException {
+ // Ask for 5 bytes, only has 2
+ ExactSizeInputStream s = new ExactSizeInputStream(byteStream("he"), 5);
+ assertEquals(2, s.available());
+
+ assertEquals((int)'h', s.read());
+ assertEquals((int)'e', s.read());
+ try {
+ s.read();
+ fail("Read when should be out of data");
+ } catch (EOFException e) {
+ // expected
+ }
+ }
+
+ @Test
+ public void testSkipNotEnough() throws IOException {
+ // Ask for 5 bytes, only has 2
+ ExactSizeInputStream s = new ExactSizeInputStream(byteStream("he"), 5);
+ assertEquals(2, s.skip(3));
+ try {
+ s.skip(1);
+ fail("Skip when should be out of data");
+ } catch (EOFException e) {
+ // expected
+ }
+ }
+
+ @Test
+ public void testReadArrayNotEnough() throws IOException {
+ // Ask for 5 bytes, only has 2
+ ExactSizeInputStream s = new ExactSizeInputStream(byteStream("he"), 5);
+ byte[] buf = new byte[10];
+ assertEquals(2, s.read(buf, 0, 5));
+ try {
+ s.read(buf, 2, 3);
+ fail("Read buf when should be out of data");
+ } catch (EOFException e) {
+ // expected
+ }
+ }
+
+ @Test
+ public void testMark() throws IOException {
+ ExactSizeInputStream s = new ExactSizeInputStream(byteStream("he"), 5);
+ assertFalse(s.markSupported());
+ try {
+ s.mark(1);
+ fail("Mark should not succeed");
+ } catch (UnsupportedOperationException uoe) {
+ // expected
+ }
+ }
+
+ private static InputStream byteStream(String data) {
+ return new ByteArrayInputStream(data.getBytes());
+ }
+}