You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2012/10/19 04:28:07 UTC
svn commit: r1399950 [8/27] - in
/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project: ./
hadoop-hdfs-httpfs/ hadoop-hdfs-httpfs/dev-support/
hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/
hadoop-hdfs-httpfs/src/main/java/org/apach...
Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/LeaseRenewer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/LeaseRenewer.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/LeaseRenewer.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/LeaseRenewer.java Fri Oct 19 02:25:55 2012
@@ -34,6 +34,7 @@ import org.apache.hadoop.hdfs.protocol.H
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Time;
/**
* <p>
@@ -156,9 +157,6 @@ class LeaseRenewer {
}
}
- private final String clienNamePostfix = DFSUtil.getRandom().nextInt()
- + "_" + Thread.currentThread().getId();
-
/** The time in milliseconds that the map became empty. */
private long emptyTime = Long.MAX_VALUE;
/** A fixed lease renewal time period in milliseconds */
@@ -212,11 +210,6 @@ class LeaseRenewer {
return renewal;
}
- /** @return the client name for the given id. */
- String getClientName(final String id) {
- return "DFSClient_" + id + "_" + clienNamePostfix;
- }
-
/** Add a client. */
private synchronized void addClient(final DFSClient dfsc) {
for(DFSClient c : dfsclients) {
@@ -270,6 +263,11 @@ class LeaseRenewer {
synchronized boolean isRunning() {
return daemon != null && daemon.isAlive();
}
+
+ /** Does this renewer have nothing to renew? */
+ public boolean isEmpty() {
+ return dfsclients.isEmpty();
+ }
/** Used only by tests */
synchronized String getDaemonName() {
@@ -279,7 +277,7 @@ class LeaseRenewer {
/** Is the empty period longer than the grace period? */
private synchronized boolean isRenewerExpired() {
return emptyTime != Long.MAX_VALUE
- && System.currentTimeMillis() - emptyTime > gracePeriod;
+ && Time.now() - emptyTime > gracePeriod;
}
synchronized void put(final String src, final DFSOutputStream out,
@@ -330,6 +328,9 @@ class LeaseRenewer {
dfsc.removeFileBeingWritten(src);
synchronized(this) {
+ if (dfsc.isFilesBeingWrittenEmpty()) {
+ dfsclients.remove(dfsc);
+ }
//update emptyTime if necessary
if (emptyTime == Long.MAX_VALUE) {
for(DFSClient c : dfsclients) {
@@ -339,7 +340,7 @@ class LeaseRenewer {
}
}
//discover the first time that all file-being-written maps are empty.
- emptyTime = System.currentTimeMillis();
+ emptyTime = Time.now();
}
}
}
@@ -354,7 +355,7 @@ class LeaseRenewer {
}
if (emptyTime == Long.MAX_VALUE) {
//discover the first time that the client list is empty.
- emptyTime = System.currentTimeMillis();
+ emptyTime = Time.now();
}
}
@@ -427,10 +428,9 @@ class LeaseRenewer {
* when the lease period is half over.
*/
private void run(final int id) throws InterruptedException {
- for(long lastRenewed = System.currentTimeMillis();
- clientsRunning() && !Thread.interrupted();
+ for(long lastRenewed = Time.now(); !Thread.interrupted();
Thread.sleep(getSleepPeriod())) {
- final long elapsed = System.currentTimeMillis() - lastRenewed;
+ final long elapsed = Time.now() - lastRenewed;
if (elapsed >= getRenewalTime()) {
try {
renew();
@@ -438,7 +438,7 @@ class LeaseRenewer {
LOG.debug("Lease renewer daemon for " + clientsString()
+ " with renew id " + id + " executed");
}
- lastRenewed = System.currentTimeMillis();
+ lastRenewed = Time.now();
} catch (SocketTimeoutException ie) {
LOG.warn("Failed to renew lease for " + clientsString() + " for "
+ (elapsed/1000) + " seconds. Aborting ...", ie);
@@ -468,6 +468,13 @@ class LeaseRenewer {
//no longer the current daemon or expired
return;
}
+
+ // if no clients are in running state or there is no more clients
+ // registered with this renewer, stop the daemon after the grace
+ // period.
+ if (!clientsRunning() && emptyTime == Long.MAX_VALUE) {
+ emptyTime = Time.now();
+ }
}
}
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java Fri Oct 19 02:25:55 2012
@@ -57,6 +57,7 @@ import org.apache.hadoop.io.retry.Failov
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryProxy;
+import org.apache.hadoop.io.retry.RetryUtils;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
@@ -68,7 +69,6 @@ import org.apache.hadoop.security.author
import org.apache.hadoop.tools.GetUserMappingsProtocol;
import com.google.common.base.Preconditions;
-import com.google.protobuf.ServiceException;
/**
* Create proxy objects to communicate with a remote NN. All remote access to an
@@ -243,99 +243,20 @@ public class NameNodeProxies {
return new NamenodeProtocolTranslatorPB(proxy);
}
- /**
- * Return the default retry policy used in RPC.
- *
- * If dfs.client.retry.policy.enabled == false, use TRY_ONCE_THEN_FAIL.
- *
- * Otherwise, first unwrap ServiceException if possible, and then
- * (1) use multipleLinearRandomRetry for
- * - SafeModeException, or
- * - IOException other than RemoteException, or
- * - ServiceException; and
- * (2) use TRY_ONCE_THEN_FAIL for
- * - non-SafeMode RemoteException, or
- * - non-IOException.
- *
- * Note that dfs.client.retry.max < 0 is not allowed.
- */
- private static RetryPolicy getDefaultRpcRetryPolicy(Configuration conf) {
- final RetryPolicy multipleLinearRandomRetry = getMultipleLinearRandomRetry(conf);
- if (LOG.isDebugEnabled()) {
- LOG.debug("multipleLinearRandomRetry = " + multipleLinearRandomRetry);
- }
- if (multipleLinearRandomRetry == null) {
- //no retry
- return RetryPolicies.TRY_ONCE_THEN_FAIL;
- } else {
- return new RetryPolicy() {
- @Override
- public RetryAction shouldRetry(Exception e, int retries, int failovers,
- boolean isMethodIdempotent) throws Exception {
- if (e instanceof ServiceException) {
- //unwrap ServiceException
- final Throwable cause = e.getCause();
- if (cause != null && cause instanceof Exception) {
- e = (Exception)cause;
- }
- }
-
- //see (1) and (2) in the javadoc of this method.
- final RetryPolicy p;
- if (e instanceof RemoteException) {
- final RemoteException re = (RemoteException)e;
- p = SafeModeException.class.getName().equals(re.getClassName())?
- multipleLinearRandomRetry: RetryPolicies.TRY_ONCE_THEN_FAIL;
- } else if (e instanceof IOException || e instanceof ServiceException) {
- p = multipleLinearRandomRetry;
- } else { //non-IOException
- p = RetryPolicies.TRY_ONCE_THEN_FAIL;
- }
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("RETRY " + retries + ") policy="
- + p.getClass().getSimpleName() + ", exception=" + e);
- }
- LOG.info("RETRY " + retries + ") policy="
- + p.getClass().getSimpleName() + ", exception=" + e);
- return p.shouldRetry(e, retries, failovers, isMethodIdempotent);
- }
- };
- }
- }
-
- /**
- * Return the MultipleLinearRandomRetry policy specified in the conf,
- * or null if the feature is disabled.
- * If the policy is specified in the conf but the policy cannot be parsed,
- * the default policy is returned.
- *
- * Conf property: N pairs of sleep-time and number-of-retries
- * dfs.client.retry.policy = "s1,n1,s2,n2,..."
- */
- private static RetryPolicy getMultipleLinearRandomRetry(Configuration conf) {
- final boolean enabled = conf.getBoolean(
- DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY,
- DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_DEFAULT);
- if (!enabled) {
- return null;
- }
-
- final String policy = conf.get(
- DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_KEY,
- DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_DEFAULT);
-
- final RetryPolicy r = RetryPolicies.MultipleLinearRandomRetry.parseCommaSeparatedString(policy);
- return r != null? r: RetryPolicies.MultipleLinearRandomRetry.parseCommaSeparatedString(
- DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_DEFAULT);
- }
-
private static ClientProtocol createNNProxyWithClientProtocol(
InetSocketAddress address, Configuration conf, UserGroupInformation ugi,
boolean withRetries) throws IOException {
RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine.class);
- final RetryPolicy defaultPolicy = getDefaultRpcRetryPolicy(conf);
+ final RetryPolicy defaultPolicy =
+ RetryUtils.getDefaultRetryPolicy(
+ conf,
+ DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY,
+ DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_DEFAULT,
+ DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_KEY,
+ DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_DEFAULT,
+ SafeModeException.class);
+
final long version = RPC.getProtocolVersion(ClientNamenodeProtocolPB.class);
ClientNamenodeProtocolPB proxy = RPC.getProtocolProxy(
ClientNamenodeProtocolPB.class, version, address, ugi, conf,
Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java Fri Oct 19 02:25:55 2012
@@ -33,6 +33,7 @@ import org.apache.hadoop.fs.FSInputCheck
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
@@ -458,7 +459,9 @@ public class RemoteBlockReader extends F
void sendReadResult(Socket sock, Status statusCode) {
assert !sentStatusCode : "already sent status code to " + sock;
try {
- RemoteBlockReader2.writeReadResult(sock, statusCode);
+ RemoteBlockReader2.writeReadResult(
+ NetUtils.getOutputStream(sock, HdfsServerConstants.WRITE_TIMEOUT),
+ statusCode);
sentStatusCode = true;
} catch (IOException e) {
// It's ok not to be able to send this. But something is probably wrong.
@@ -484,4 +487,11 @@ public class RemoteBlockReader extends F
throw new UnsupportedOperationException("readDirect unsupported in RemoteBlockReader");
}
+ @Override
+ public IOStreamPair getStreams() {
+ // This class doesn't support encryption, which is the only thing this
+ // method is used for. See HDFS-3637.
+ return null;
+ }
+
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java Fri Oct 19 02:25:55 2012
@@ -32,26 +32,23 @@ import java.nio.channels.ReadableByteCha
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
+import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
-import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
-import org.apache.hadoop.hdfs.util.DirectBufferPool;
-import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.SocketInputWrapper;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
-import com.google.common.base.Preconditions;
-
/**
* This is a wrapper around connection to datanode
* and understands checksum, offset etc.
@@ -83,15 +80,15 @@ public class RemoteBlockReader2 impleme
static final Log LOG = LogFactory.getLog(RemoteBlockReader2.class);
- Socket dnSock; //for now just sending the status code (e.g. checksumOk) after the read.
+ Socket dnSock;
+ // for now just sending the status code (e.g. checksumOk) after the read.
+ private IOStreamPair ioStreams;
private final ReadableByteChannel in;
private DataChecksum checksum;
- private PacketHeader curHeader;
- private ByteBuffer curPacketBuf = null;
+ private PacketReceiver packetReceiver = new PacketReceiver(true);
private ByteBuffer curDataSlice = null;
-
/** offset in block of the last chunk received */
private long lastSeqNo = -1;
@@ -99,10 +96,6 @@ public class RemoteBlockReader2 impleme
private long startOffset;
private final String filename;
- private static DirectBufferPool bufferPool = new DirectBufferPool();
- private final ByteBuffer headerBuf = ByteBuffer.allocate(
- PacketHeader.PKT_HEADER_LEN);
-
private final int bytesPerChecksum;
private final int checksumSize;
@@ -126,7 +119,7 @@ public class RemoteBlockReader2 impleme
public synchronized int read(byte[] buf, int off, int len)
throws IOException {
- if (curPacketBuf == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
+ if (curDataSlice == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
readNextPacket();
}
if (curDataSlice.remaining() == 0) {
@@ -143,7 +136,7 @@ public class RemoteBlockReader2 impleme
@Override
public int read(ByteBuffer buf) throws IOException {
- if (curPacketBuf == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
+ if (curDataSlice == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
readNextPacket();
}
if (curDataSlice.remaining() == 0) {
@@ -161,11 +154,13 @@ public class RemoteBlockReader2 impleme
}
private void readNextPacket() throws IOException {
- Preconditions.checkState(curHeader == null || !curHeader.isLastPacketInBlock());
-
//Read packet headers.
- readPacketHeader();
+ packetReceiver.receiveNextPacket(in);
+ PacketHeader curHeader = packetReceiver.getHeader();
+ curDataSlice = packetReceiver.getDataSlice();
+ assert curDataSlice.capacity() == curHeader.getDataLen();
+
if (LOG.isTraceEnabled()) {
LOG.trace("DFSClient readNextPacket got header " + curHeader);
}
@@ -179,17 +174,20 @@ public class RemoteBlockReader2 impleme
if (curHeader.getDataLen() > 0) {
int chunks = 1 + (curHeader.getDataLen() - 1) / bytesPerChecksum;
int checksumsLen = chunks * checksumSize;
- int bufsize = checksumsLen + curHeader.getDataLen();
+
+ assert packetReceiver.getChecksumSlice().capacity() == checksumsLen :
+ "checksum slice capacity=" + packetReceiver.getChecksumSlice().capacity() +
+ " checksumsLen=" + checksumsLen;
- resetPacketBuffer(checksumsLen, curHeader.getDataLen());
-
lastSeqNo = curHeader.getSeqno();
- if (bufsize > 0) {
- readChannelFully(in, curPacketBuf);
- curPacketBuf.flip();
- if (verifyChecksum) {
- verifyPacketChecksums();
- }
+ if (verifyChecksum && curDataSlice.remaining() > 0) {
+ // N.B.: the checksum error offset reported here is actually
+ // relative to the start of the block, not the start of the file.
+ // This is slightly misleading, but preserves the behavior from
+ // the older BlockReader.
+ checksum.verifyChunkedSums(curDataSlice,
+ packetReceiver.getChecksumSlice(),
+ filename, curHeader.getOffsetInBlock());
}
bytesNeededToFinish -= curHeader.getDataLen();
}
@@ -206,46 +204,13 @@ public class RemoteBlockReader2 impleme
if (bytesNeededToFinish <= 0) {
readTrailingEmptyPacket();
if (verifyChecksum) {
- sendReadResult(dnSock, Status.CHECKSUM_OK);
+ sendReadResult(Status.CHECKSUM_OK);
} else {
- sendReadResult(dnSock, Status.SUCCESS);
- }
- }
- }
-
- private void verifyPacketChecksums() throws ChecksumException {
- // N.B.: the checksum error offset reported here is actually
- // relative to the start of the block, not the start of the file.
- // This is slightly misleading, but preserves the behavior from
- // the older BlockReader.
- checksum.verifyChunkedSums(curDataSlice, curPacketBuf,
- filename, curHeader.getOffsetInBlock());
- }
-
- private static void readChannelFully(ReadableByteChannel ch, ByteBuffer buf)
- throws IOException {
- while (buf.remaining() > 0) {
- int n = ch.read(buf);
- if (n < 0) {
- throw new IOException("Premature EOF reading from " + ch);
+ sendReadResult(Status.SUCCESS);
}
}
}
-
- private void resetPacketBuffer(int checksumsLen, int dataLen) {
- int packetLen = checksumsLen + dataLen;
- if (curPacketBuf == null ||
- curPacketBuf.capacity() < packetLen) {
- returnPacketBufToPool();
- curPacketBuf = bufferPool.getBuffer(packetLen);
- }
- curPacketBuf.position(checksumsLen);
- curDataSlice = curPacketBuf.slice();
- curDataSlice.limit(dataLen);
- curPacketBuf.clear();
- curPacketBuf.limit(checksumsLen + dataLen);
- }
-
+
@Override
public synchronized long skip(long n) throws IOException {
/* How can we make sure we don't throw a ChecksumException, at least
@@ -266,23 +231,14 @@ public class RemoteBlockReader2 impleme
return nSkipped;
}
- private void readPacketHeader() throws IOException {
- headerBuf.clear();
- readChannelFully(in, headerBuf);
- headerBuf.flip();
- if (curHeader == null) curHeader = new PacketHeader();
- curHeader.readFields(headerBuf);
- }
-
private void readTrailingEmptyPacket() throws IOException {
if (LOG.isTraceEnabled()) {
LOG.trace("Reading empty packet at end of read");
}
- headerBuf.clear();
- readChannelFully(in, headerBuf);
- headerBuf.flip();
- PacketHeader trailer = new PacketHeader();
- trailer.readFields(headerBuf);
+
+ packetReceiver.receiveNextPacket(in);
+
+ PacketHeader trailer = packetReceiver.getHeader();
if (!trailer.isLastPacketInBlock() ||
trailer.getDataLen() != 0) {
throw new IOException("Expected empty end-of-read packet! Header: " +
@@ -292,9 +248,11 @@ public class RemoteBlockReader2 impleme
protected RemoteBlockReader2(String file, String bpid, long blockId,
ReadableByteChannel in, DataChecksum checksum, boolean verifyChecksum,
- long startOffset, long firstChunkOffset, long bytesToRead, Socket dnSock) {
+ long startOffset, long firstChunkOffset, long bytesToRead, Socket dnSock,
+ IOStreamPair ioStreams) {
// Path is used only for printing block and file information in debug
this.dnSock = dnSock;
+ this.ioStreams = ioStreams;
this.in = in;
this.checksum = checksum;
this.verifyChecksum = verifyChecksum;
@@ -313,7 +271,7 @@ public class RemoteBlockReader2 impleme
@Override
public synchronized void close() throws IOException {
- returnPacketBufToPool();
+ packetReceiver.close();
startOffset = -1;
checksum = null;
@@ -324,24 +282,6 @@ public class RemoteBlockReader2 impleme
// in will be closed when its Socket is closed.
}
- @Override
- protected void finalize() throws Throwable {
- try {
- // just in case it didn't get closed, we
- // may as well still try to return the buffer
- returnPacketBufToPool();
- } finally {
- super.finalize();
- }
- }
-
- private void returnPacketBufToPool() {
- if (curPacketBuf != null) {
- bufferPool.returnBuffer(curPacketBuf);
- curPacketBuf = null;
- }
- }
-
/**
* Take the socket used to talk to the DN.
*/
@@ -369,24 +309,23 @@ public class RemoteBlockReader2 impleme
* closing our connection (which we will re-open), but won't affect
* data correctness.
*/
- void sendReadResult(Socket sock, Status statusCode) {
- assert !sentStatusCode : "already sent status code to " + sock;
+ void sendReadResult(Status statusCode) {
+ assert !sentStatusCode : "already sent status code to " + dnSock;
try {
- writeReadResult(sock, statusCode);
+ writeReadResult(ioStreams.out, statusCode);
sentStatusCode = true;
} catch (IOException e) {
// It's ok not to be able to send this. But something is probably wrong.
LOG.info("Could not send read status (" + statusCode + ") to datanode " +
- sock.getInetAddress() + ": " + e.getMessage());
+ dnSock.getInetAddress() + ": " + e.getMessage());
}
}
/**
* Serialize the actual read result on the wire.
*/
- static void writeReadResult(Socket sock, Status statusCode)
+ static void writeReadResult(OutputStream out, Status statusCode)
throws IOException {
- OutputStream out = NetUtils.getOutputStream(sock, HdfsServerConstants.WRITE_TIMEOUT);
ClientReadStatusProto.newBuilder()
.setStatus(statusCode)
@@ -434,25 +373,32 @@ public class RemoteBlockReader2 impleme
* @param clientName Client name
* @return New BlockReader instance, or null on error.
*/
- public static BlockReader newBlockReader( Socket sock, String file,
+ public static BlockReader newBlockReader(Socket sock, String file,
ExtendedBlock block,
Token<BlockTokenIdentifier> blockToken,
long startOffset, long len,
int bufferSize, boolean verifyChecksum,
- String clientName)
+ String clientName,
+ DataEncryptionKey encryptionKey,
+ IOStreamPair ioStreams)
throws IOException {
+
+ ReadableByteChannel ch;
+ if (ioStreams.in instanceof SocketInputWrapper) {
+ ch = ((SocketInputWrapper)ioStreams.in).getReadableByteChannel();
+ } else {
+ ch = (ReadableByteChannel) ioStreams.in;
+ }
+
// in and out will be closed when sock is closed (by the caller)
final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
- NetUtils.getOutputStream(sock,
- HdfsServerConstants.WRITE_TIMEOUT)));
+ ioStreams.out));
new Sender(out).readBlock(block, blockToken, clientName, startOffset, len);
//
- // Get bytes in block, set streams
+ // Get bytes in block
//
- SocketInputWrapper sin = NetUtils.getInputStream(sock);
- ReadableByteChannel ch = sin.getReadableByteChannel();
- DataInputStream in = new DataInputStream(sin);
+ DataInputStream in = new DataInputStream(ioStreams.in);
BlockOpResponseProto status = BlockOpResponseProto.parseFrom(
vintPrefixed(in));
@@ -474,7 +420,8 @@ public class RemoteBlockReader2 impleme
}
return new RemoteBlockReader2(file, block.getBlockPoolId(), block.getBlockId(),
- ch, checksum, verifyChecksum, startOffset, firstChunkOffset, len, sock);
+ ch, checksum, verifyChecksum, startOffset, firstChunkOffset, len, sock,
+ ioStreams);
}
static void checkSuccess(
@@ -498,4 +445,9 @@ public class RemoteBlockReader2 impleme
}
}
}
+
+ @Override
+ public IOStreamPair getStreams() {
+ return ioStreams;
+ }
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/SocketCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/SocketCache.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/SocketCache.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/SocketCache.java Fri Oct 19 02:25:55 2012
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs;
+import java.io.Closeable;
import java.net.Socket;
import java.net.SocketAddress;
@@ -25,53 +26,135 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
+import java.io.IOException;
import com.google.common.base.Preconditions;
import com.google.common.collect.LinkedListMultimap;
import org.apache.commons.logging.Log;
+import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.StringUtils;
/**
- * A cache of sockets.
+ * A cache of input stream sockets to Data Node.
*/
class SocketCache {
- static final Log LOG = LogFactory.getLog(SocketCache.class);
+ private static final Log LOG = LogFactory.getLog(SocketCache.class);
- private final LinkedListMultimap<SocketAddress, Socket> multimap;
- private final int capacity;
+ @InterfaceAudience.Private
+ static class SocketAndStreams implements Closeable {
+ public final Socket sock;
+ public final IOStreamPair ioStreams;
+ long createTime;
+
+ public SocketAndStreams(Socket s, IOStreamPair ioStreams) {
+ this.sock = s;
+ this.ioStreams = ioStreams;
+ this.createTime = System.currentTimeMillis();
+ }
+
+ @Override
+ public void close() {
+ if (ioStreams != null) {
+ IOUtils.closeStream(ioStreams.in);
+ IOUtils.closeStream(ioStreams.out);
+ }
+ IOUtils.closeSocket(sock);
+ }
- /**
- * Create a SocketCache with the given capacity.
- * @param capacity Max cache size.
- */
- public SocketCache(int capacity) {
- multimap = LinkedListMultimap.create();
- this.capacity = capacity;
- if (capacity <= 0) {
- LOG.debug("SocketCache disabled in configuration.");
+ public long getCreateTime() {
+ return this.createTime;
}
}
+ private Daemon daemon;
+ /** A map for per user per datanode. */
+ private static LinkedListMultimap<SocketAddress, SocketAndStreams> multimap =
+ LinkedListMultimap.create();
+ private static int capacity;
+ private static long expiryPeriod;
+ private static SocketCache scInstance = new SocketCache();
+ private static boolean isInitedOnce = false;
+
+ public static synchronized SocketCache getInstance(int c, long e) {
+ // capacity is only initialized once
+ if (isInitedOnce == false) {
+ capacity = c;
+ expiryPeriod = e;
+
+ if (capacity == 0 ) {
+ LOG.info("SocketCache disabled.");
+ }
+ else if (expiryPeriod == 0) {
+ throw new IllegalStateException("Cannot initialize expiryPeriod to " +
+ expiryPeriod + "when cache is enabled.");
+ }
+ isInitedOnce = true;
+ } else { //already initialized once
+ if (capacity != c || expiryPeriod != e) {
+ LOG.info("capacity and expiry periods already set to " + capacity +
+ " and " + expiryPeriod + " respectively. Cannot set it to " + c +
+ " and " + e);
+ }
+ }
+
+ return scInstance;
+ }
+
+ private boolean isDaemonStarted() {
+ return (daemon == null)? false: true;
+ }
+
+ private synchronized void startExpiryDaemon() {
+ // start daemon only if not already started
+ if (isDaemonStarted() == true) {
+ return;
+ }
+
+ daemon = new Daemon(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ SocketCache.this.run();
+ } catch(InterruptedException e) {
+ //noop
+ } finally {
+ SocketCache.this.clear();
+ }
+ }
+
+ @Override
+ public String toString() {
+ return String.valueOf(SocketCache.this);
+ }
+ });
+ daemon.start();
+ }
+
/**
* Get a cached socket to the given address.
* @param remote Remote address the socket is connected to.
* @return A socket with unknown state, possibly closed underneath. Or null.
*/
- public synchronized Socket get(SocketAddress remote) {
+ public synchronized SocketAndStreams get(SocketAddress remote) {
+
if (capacity <= 0) { // disabled
return null;
}
-
- List<Socket> socklist = multimap.get(remote);
- if (socklist == null) {
+
+ List<SocketAndStreams> sockStreamList = multimap.get(remote);
+ if (sockStreamList == null) {
return null;
}
- Iterator<Socket> iter = socklist.iterator();
+ Iterator<SocketAndStreams> iter = sockStreamList.iterator();
while (iter.hasNext()) {
- Socket candidate = iter.next();
+ SocketAndStreams candidate = iter.next();
iter.remove();
- if (!candidate.isClosed()) {
+ if (!candidate.sock.isClosed()) {
return candidate;
}
}
@@ -82,14 +165,17 @@ class SocketCache {
* Give an unused socket to the cache.
* @param sock socket not used by anyone.
*/
- public synchronized void put(Socket sock) {
+ public synchronized void put(Socket sock, IOStreamPair ioStreams) {
+
+ Preconditions.checkNotNull(sock);
+ SocketAndStreams s = new SocketAndStreams(sock, ioStreams);
if (capacity <= 0) {
// Cache disabled.
- IOUtils.closeSocket(sock);
+ s.close();
return;
}
-
- Preconditions.checkNotNull(sock);
+
+ startExpiryDaemon();
SocketAddress remoteAddr = sock.getRemoteSocketAddress();
if (remoteAddr == null) {
@@ -102,7 +188,7 @@ class SocketCache {
if (capacity == multimap.size()) {
evictOldest();
}
- multimap.put(remoteAddr, sock);
+ multimap.put(remoteAddr, s);
}
public synchronized int size() {
@@ -110,32 +196,67 @@ class SocketCache {
}
/**
+ * Evict and close sockets older than expiry period from the cache.
+ */
+ private synchronized void evictExpired(long expiryPeriod) {
+ while (multimap.size() != 0) {
+ Iterator<Entry<SocketAddress, SocketAndStreams>> iter =
+ multimap.entries().iterator();
+ Entry<SocketAddress, SocketAndStreams> entry = iter.next();
+ // if oldest socket expired, remove it
+ if (entry == null ||
+ System.currentTimeMillis() - entry.getValue().getCreateTime() <
+ expiryPeriod) {
+ break;
+ }
+ iter.remove();
+ SocketAndStreams s = entry.getValue();
+ s.close();
+ }
+ }
+
+ /**
* Evict the oldest entry in the cache.
*/
private synchronized void evictOldest() {
- Iterator<Entry<SocketAddress, Socket>> iter =
+ Iterator<Entry<SocketAddress, SocketAndStreams>> iter =
multimap.entries().iterator();
if (!iter.hasNext()) {
- throw new IllegalStateException("Cannot evict from empty cache!");
+ throw new IllegalStateException("Cannot evict from empty cache! " +
+ "capacity: " + capacity);
}
- Entry<SocketAddress, Socket> entry = iter.next();
+ Entry<SocketAddress, SocketAndStreams> entry = iter.next();
iter.remove();
- Socket sock = entry.getValue();
- IOUtils.closeSocket(sock);
+ SocketAndStreams s = entry.getValue();
+ s.close();
}
/**
- * Empty the cache, and close all sockets.
+ * Periodically check in the cache and expire the entries
+ * older than expiryPeriod minutes
*/
- public synchronized void clear() {
- for (Socket sock : multimap.values()) {
- IOUtils.closeSocket(sock);
+ private void run() throws InterruptedException {
+ for(long lastExpiryTime = System.currentTimeMillis();
+ !Thread.interrupted();
+ Thread.sleep(expiryPeriod)) {
+ final long elapsed = System.currentTimeMillis() - lastExpiryTime;
+ if (elapsed >= expiryPeriod) {
+ evictExpired(expiryPeriod);
+ lastExpiryTime = System.currentTimeMillis();
+ }
}
- multimap.clear();
+ clear();
+ throw new InterruptedException("Daemon Interrupted");
}
- protected void finalize() {
- clear();
+ /**
+ * Empty the cache, and close all sockets.
+ */
+ private synchronized void clear() {
+ for (SocketAndStreams sockAndStream : multimap.values()) {
+ sockAndStream.close();
+ }
+ multimap.clear();
}
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/Block.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/Block.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/Block.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/Block.java Fri Oct 19 02:25:55 2012
@@ -40,6 +40,7 @@ public class Block implements Writable,
WritableFactories.setFactory
(Block.class,
new WritableFactory() {
+ @Override
public Writable newInstance() { return new Block(); }
});
}
@@ -146,6 +147,7 @@ public class Block implements Writable,
/**
*/
+ @Override
public String toString() {
return getBlockName() + "_" + getGenerationStamp();
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java Fri Oct 19 02:25:55 2012
@@ -148,10 +148,12 @@ public class BlockListAsLongs implements
this.currentReplicaState = null;
}
+ @Override
public boolean hasNext() {
return currentBlockIndex < getNumberOfBlocks();
}
+ @Override
public Block next() {
block.set(blockId(currentBlockIndex),
blockLength(currentBlockIndex),
@@ -161,6 +163,7 @@ public class BlockListAsLongs implements
return block;
}
+ @Override
public void remove() {
throw new UnsupportedOperationException("Sorry. can't remove.");
}
@@ -178,6 +181,7 @@ public class BlockListAsLongs implements
/**
* Returns an iterator over blocks in the block report.
*/
+ @Override
public Iterator<Block> iterator() {
return getBlockReportIterator();
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java Fri Oct 19 02:25:55 2012
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.protocol;
import java.io.IOException;
+import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@@ -106,4 +107,21 @@ public interface ClientDatanodeProtocol
*/
BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock block,
Token<BlockTokenIdentifier> token) throws IOException;
+
+ /**
+ * Retrieves volume location information about a list of blocks on a datanode.
+ * This is in the form of an opaque {@link VolumeId} for each configured
+ * data directory, which is not guaranteed to be the same across DN restarts.
+ *
+ * @param blocks
+ * list of blocks on the local datanode
+ * @param tokens
+ * block access tokens corresponding to the requested blocks
+ * @return an HdfsBlocksMetadata that associates {@link ExtendedBlock}s with
+ * data directories
+ * @throws IOException
+ * if datanode is unreachable, or replica is not found on datanode
+ */
+ HdfsBlocksMetadata getHdfsBlocksMetadata(List<ExtendedBlock> blocks,
+ List<Token<BlockTokenIdentifier>> tokens) throws IOException;
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java Fri Oct 19 02:25:55 2012
@@ -33,8 +33,6 @@ import org.apache.hadoop.fs.UnresolvedLi
import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants.UpgradeAction;
-import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
import org.apache.hadoop.io.EnumSetWritable;
@@ -44,6 +42,7 @@ import org.apache.hadoop.security.Access
import org.apache.hadoop.security.KerberosInfo;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenInfo;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector;
@@ -668,6 +667,18 @@ public interface ClientProtocol {
*/
public void saveNamespace() throws AccessControlException, IOException;
+
+ /**
+ * Roll the edit log.
+ * Requires superuser privileges.
+ *
+ * @throws AccessControlException if the superuser privilege is violated
+ * @throws IOException if log roll fails
+ * @return the txid of the new segment
+ */
+ @Idempotent
+ public long rollEdits() throws AccessControlException, IOException;
+
/**
* Enable/Disable restore failed storage.
* <p>
@@ -694,16 +705,6 @@ public interface ClientProtocol {
public void finalizeUpgrade() throws IOException;
/**
- * Report distributed upgrade progress or force current upgrade to proceed.
- *
- * @param action {@link HdfsConstants.UpgradeAction} to perform
- * @return upgrade status information or null if no upgrades are in progress
- * @throws IOException
- */
- public UpgradeStatusReport distributedUpgradeProgress(UpgradeAction action)
- throws IOException;
-
- /**
* @return CorruptFileBlocks, containing a list of corrupt files (with
* duplicates if there is more than one corrupt block in a file)
* and a cookie
@@ -941,4 +942,11 @@ public interface ClientProtocol {
*/
public void cancelDelegationToken(Token<DelegationTokenIdentifier> token)
throws IOException;
+
+ /**
+ * @return encryption key so a client can encrypt data sent via the
+ * DataTransferProtocol to/from DataNodes.
+ * @throws IOException
+ */
+ public DataEncryptionKey getDataEncryptionKey() throws IOException;
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DSQuotaExceededException.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DSQuotaExceededException.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DSQuotaExceededException.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DSQuotaExceededException.java Fri Oct 19 02:25:55 2012
@@ -37,6 +37,7 @@ public class DSQuotaExceededException ex
super(quota, count);
}
+ @Override
public String getMessage() {
String msg = super.getMessage();
if (msg == null) {
Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java Fri Oct 19 02:25:55 2012
@@ -37,12 +37,12 @@ import org.apache.hadoop.classification.
public class DatanodeID implements Comparable<DatanodeID> {
public static final DatanodeID[] EMPTY_ARRAY = {};
- protected String ipAddr; // IP address
- protected String hostName; // hostname
- protected String storageID; // unique per cluster storageID
- protected int xferPort; // data streaming port
- protected int infoPort; // info server port
- protected int ipcPort; // IPC server port
+ private String ipAddr; // IP address
+ private String hostName; // hostname
+ private String storageID; // unique per cluster storageID
+ private int xferPort; // data streaming port
+ private int infoPort; // info server port
+ private int ipcPort; // IPC server port
public DatanodeID(DatanodeID from) {
this(from.getIpAddr(),
@@ -104,7 +104,7 @@ public class DatanodeID implements Compa
/**
* @return IP:ipcPort string
*/
- public String getIpcAddr() {
+ private String getIpcAddr() {
return ipAddr + ":" + ipcPort;
}
@@ -123,6 +123,29 @@ public class DatanodeID implements Compa
}
/**
+ * @return hostname:ipcPort
+ */
+ private String getIpcAddrWithHostname() {
+ return hostName + ":" + ipcPort;
+ }
+
+ /**
+ * @param useHostname true to use the DN hostname, use the IP otherwise
+ * @return name:xferPort
+ */
+ public String getXferAddr(boolean useHostname) {
+ return useHostname ? getXferAddrWithHostname() : getXferAddr();
+ }
+
+ /**
+ * @param useHostname true to use the DN hostname, use the IP otherwise
+ * @return name:ipcPort
+ */
+ public String getIpcAddr(boolean useHostname) {
+ return useHostname ? getIpcAddrWithHostname() : getIpcAddr();
+ }
+
+ /**
* @return data storage ID.
*/
public String getStorageID() {
@@ -150,6 +173,7 @@ public class DatanodeID implements Compa
return ipcPort;
}
+ @Override
public boolean equals(Object to) {
if (this == to) {
return true;
@@ -161,10 +185,12 @@ public class DatanodeID implements Compa
storageID.equals(((DatanodeID)to).getStorageID()));
}
+ @Override
public int hashCode() {
return getXferAddr().hashCode()^ storageID.hashCode();
}
+ @Override
public String toString() {
return getXferAddr();
}
@@ -187,6 +213,7 @@ public class DatanodeID implements Compa
* @param that
* @return as specified by Comparable
*/
+ @Override
public int compareTo(DatanodeID that) {
return getXferAddr().compareTo(that.getXferAddr());
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java Fri Oct 19 02:25:55 2012
@@ -27,6 +27,7 @@ import org.apache.hadoop.net.NetworkTopo
import org.apache.hadoop.net.Node;
import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Time;
/**
* This class extends the primary identifier of a Datanode with ephemeral
@@ -36,13 +37,13 @@ import org.apache.hadoop.util.StringUtil
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class DatanodeInfo extends DatanodeID implements Node {
- protected long capacity;
- protected long dfsUsed;
- protected long remaining;
- protected long blockPoolUsed;
- protected long lastUpdate;
- protected int xceiverCount;
- protected String location = NetworkTopology.DEFAULT_RACK;
+ private long capacity;
+ private long dfsUsed;
+ private long remaining;
+ private long blockPoolUsed;
+ private long lastUpdate;
+ private int xceiverCount;
+ private String location = NetworkTopology.DEFAULT_RACK;
// Datanode administrative states
public enum AdminStates {
@@ -56,6 +57,7 @@ public class DatanodeInfo extends Datano
this.value = v;
}
+ @Override
public String toString() {
return value;
}
@@ -79,8 +81,7 @@ public class DatanodeInfo extends Datano
this.lastUpdate = from.getLastUpdate();
this.xceiverCount = from.getXceiverCount();
this.location = from.getNetworkLocation();
- this.adminState = from.adminState;
- this.hostName = from.hostName;
+ this.adminState = from.getAdminState();
}
public DatanodeInfo(DatanodeID nodeID) {
@@ -126,6 +127,7 @@ public class DatanodeInfo extends Datano
}
/** Network location name */
+ @Override
public String getName() {
return getXferAddr();
}
@@ -200,9 +202,11 @@ public class DatanodeInfo extends Datano
}
/** network location */
+ @Override
public synchronized String getNetworkLocation() {return location;}
/** Sets the network location */
+ @Override
public synchronized void setNetworkLocation(String location) {
this.location = NodeBase.normalize(location);
}
@@ -317,7 +321,24 @@ public class DatanodeInfo extends Datano
}
return adminState;
}
-
+
+ /**
+ * Check if the datanode is in stale state. Here if
+ * the namenode has not received heartbeat msg from a
+ * datanode for more than staleInterval (default value is
+ * {@link DFSConfigKeys#DFS_NAMENODE_STALE_DATANODE_INTERVAL_MILLI_DEFAULT}),
+ * the datanode will be treated as stale node.
+ *
+ * @param staleInterval
+ * the time interval for marking the node as stale. If the last
+ * update time is beyond the given time interval, the node will be
+ * marked as stale.
+ * @return true if the node is stale
+ */
+ public boolean isStale(long staleInterval) {
+ return (Time.now() - lastUpdate) >= staleInterval;
+ }
+
/**
* Sets the admin state of this node.
*/
@@ -334,13 +355,17 @@ public class DatanodeInfo extends Datano
private transient Node parent; //its parent
/** Return this node's parent */
+ @Override
public Node getParent() { return parent; }
+ @Override
public void setParent(Node parent) {this.parent = parent;}
/** Return this node's level in the tree.
* E.g. the root of a tree returns 0 and its children return 1
*/
+ @Override
public int getLevel() { return level; }
+ @Override
public void setLevel(int level) {this.level = level;}
@Override
Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java Fri Oct 19 02:25:55 2012
@@ -60,7 +60,7 @@ public class HdfsConstants {
public static int MAX_PATH_LENGTH = 8000;
public static int MAX_PATH_DEPTH = 1000;
- // TODO mb@media-style.com: should be conf injected?
+ // TODO should be conf injected?
public static final int DEFAULT_DATA_SOCKET_SIZE = 128 * 1024;
public static final int IO_FILE_BUFFER_SIZE = new HdfsConfiguration().getInt(
DFSConfigKeys.IO_FILE_BUFFER_SIZE_KEY,
@@ -85,16 +85,6 @@ public class HdfsConstants {
public static final long INVALID_TXID = -12345;
/**
- * Distributed upgrade actions:
- *
- * 1. Get upgrade status. 2. Get detailed upgrade status. 3. Proceed with the
- * upgrade if it is stuck, no matter what the status is.
- */
- public static enum UpgradeAction {
- GET_STATUS, DETAILED_STATUS, FORCE_PROCEED;
- }
-
- /**
* URI Scheme for hdfs://namenode/ URIs.
*/
public static final String HDFS_URI_SCHEME = "hdfs";
Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsProtoUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsProtoUtil.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsProtoUtil.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsProtoUtil.java Fri Oct 19 02:25:55 2012
@@ -29,6 +29,7 @@ import org.apache.hadoop.hdfs.protocol.p
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.util.ExactSizeInputStream;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.security.token.Token;
import com.google.common.collect.Lists;
@@ -155,6 +156,14 @@ public abstract class HdfsProtoUtil {
return ret;
}
+ public static DataChecksum.Type fromProto(HdfsProtos.ChecksumTypeProto type) {
+ return DataChecksum.Type.valueOf(type.name());
+ }
+
+ public static HdfsProtos.ChecksumTypeProto toProto(DataChecksum.Type type) {
+ return HdfsProtos.ChecksumTypeProto.valueOf(type.name());
+ }
+
public static InputStream vintPrefixed(final InputStream input)
throws IOException {
final int firstByte = input.read();
@@ -167,4 +176,4 @@ public abstract class HdfsProtoUtil {
return new ExactSizeInputStream(input, size);
}
-}
\ No newline at end of file
+}
Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlocks.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlocks.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlocks.java Fri Oct 19 02:25:55 2012
@@ -113,6 +113,7 @@ public class LocatedBlocks {
Comparator<LocatedBlock> comp =
new Comparator<LocatedBlock>() {
// Returns 0 iff a is inside b or b is inside a
+ @Override
public int compare(LocatedBlock a, LocatedBlock b) {
long aBeg = a.getStartOffset();
long bBeg = b.getStartOffset();
Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/NSQuotaExceededException.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/NSQuotaExceededException.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/NSQuotaExceededException.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/NSQuotaExceededException.java Fri Oct 19 02:25:55 2012
@@ -36,6 +36,7 @@ public final class NSQuotaExceededExcept
super(quota, count);
}
+ @Override
public String getMessage() {
String msg = super.getMessage();
if (msg == null) {
Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/QuotaExceededException.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/QuotaExceededException.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/QuotaExceededException.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/QuotaExceededException.java Fri Oct 19 02:25:55 2012
@@ -58,6 +58,7 @@ public class QuotaExceededException exte
this.pathName = path;
}
+ @Override
public String getMessage() {
return super.getMessage();
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java Fri Oct 19 02:25:55 2012
@@ -24,16 +24,13 @@ import org.apache.hadoop.hdfs.protocol.E
import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BaseHeaderProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto.ChecksumType;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ChecksumTypeProto;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
-import com.google.common.collect.BiMap;
-import com.google.common.collect.ImmutableBiMap;
-
/**
* Static utilities for dealing with the protocol buffers used by the
@@ -42,19 +39,6 @@ import com.google.common.collect.Immutab
@InterfaceAudience.Private
@InterfaceStability.Evolving
public abstract class DataTransferProtoUtil {
-
- /**
- * Map between the internal DataChecksum identifiers and the protobuf-
- * generated identifiers on the wire.
- */
- static BiMap<Integer, ChecksumProto.ChecksumType> checksumTypeMap =
- ImmutableBiMap.<Integer, ChecksumProto.ChecksumType>builder()
- .put(DataChecksum.CHECKSUM_CRC32, ChecksumProto.ChecksumType.CRC32)
- .put(DataChecksum.CHECKSUM_CRC32C, ChecksumProto.ChecksumType.CRC32C)
- .put(DataChecksum.CHECKSUM_NULL, ChecksumProto.ChecksumType.NULL)
- .build();
-
-
static BlockConstructionStage fromProto(
OpWriteBlockProto.BlockConstructionStage stage) {
return BlockConstructionStage.valueOf(BlockConstructionStage.class,
@@ -68,7 +52,7 @@ public abstract class DataTransferProtoU
}
public static ChecksumProto toProto(DataChecksum checksum) {
- ChecksumType type = checksumTypeMap.get(checksum.getChecksumType());
+ ChecksumTypeProto type = ChecksumTypeProto.valueOf(checksum.getChecksumType().name());
if (type == null) {
throw new IllegalArgumentException(
"Can't convert checksum to protobuf: " + checksum);
@@ -84,7 +68,7 @@ public abstract class DataTransferProtoU
if (proto == null) return null;
int bytesPerChecksum = proto.getBytesPerChecksum();
- int type = checksumTypeMap.inverse().get(proto.getType());
+ DataChecksum.Type type = DataChecksum.Type.valueOf(proto.getType().name());
return DataChecksum.newDataChecksum(type, bytesPerChecksum);
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java Fri Oct 19 02:25:55 2012
@@ -27,14 +27,31 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PacketHeaderProto;
import org.apache.hadoop.hdfs.util.ByteBufferOutputStream;
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Shorts;
+import com.google.common.primitives.Ints;
+import com.google.protobuf.InvalidProtocolBufferException;
+
/**
* Header data for each packet that goes through the read/write pipelines.
+ * Includes all of the information about the packet, excluding checksums and
+ * actual data.
+ *
+ * This data includes:
+ * - the offset in bytes into the HDFS block of the data in this packet
+ * - the sequence number of this packet in the pipeline
+ * - whether or not this is the last packet in the pipeline
+ * - the length of the data in this packet
+ * - whether or not this packet should be synced by the DNs.
+ *
+ * When serialized, this header is written out as a protocol buffer, preceded
+ * by a 4-byte integer representing the full packet length, and a 2-byte short
+ * representing the header length.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class PacketHeader {
- /** Header size for a packet */
- private static final int PROTO_SIZE =
+ private static final int MAX_PROTO_SIZE =
PacketHeaderProto.newBuilder()
.setOffsetInBlock(0)
.setSeqno(0)
@@ -42,8 +59,10 @@ public class PacketHeader {
.setDataLen(0)
.setSyncBlock(false)
.build().getSerializedSize();
- public static final int PKT_HEADER_LEN =
- 6 + PROTO_SIZE;
+ public static final int PKT_LENGTHS_LEN =
+ Ints.BYTES + Shorts.BYTES;
+ public static final int PKT_MAX_HEADER_LEN =
+ PKT_LENGTHS_LEN + MAX_PROTO_SIZE;
private int packetLen;
private PacketHeaderProto proto;
@@ -54,13 +73,25 @@ public class PacketHeader {
public PacketHeader(int packetLen, long offsetInBlock, long seqno,
boolean lastPacketInBlock, int dataLen, boolean syncBlock) {
this.packetLen = packetLen;
- proto = PacketHeaderProto.newBuilder()
+ Preconditions.checkArgument(packetLen >= Ints.BYTES,
+ "packet len %s should always be at least 4 bytes",
+ packetLen);
+
+ PacketHeaderProto.Builder builder = PacketHeaderProto.newBuilder()
.setOffsetInBlock(offsetInBlock)
.setSeqno(seqno)
.setLastPacketInBlock(lastPacketInBlock)
- .setDataLen(dataLen)
- .setSyncBlock(syncBlock)
- .build();
+ .setDataLen(dataLen);
+
+ if (syncBlock) {
+ // Only set syncBlock if it is specified.
+ // This is wire-incompatible with Hadoop 2.0.0-alpha due to HDFS-3721
+ // because it changes the length of the packet header, and BlockReceiver
+ // in that version did not support variable-length headers.
+ builder.setSyncBlock(syncBlock);
+ }
+
+ proto = builder.build();
}
public int getDataLen() {
@@ -90,10 +121,16 @@ public class PacketHeader {
@Override
public String toString() {
return "PacketHeader with packetLen=" + packetLen +
- "Header data: " +
+ " header data: " +
proto.toString();
}
+ public void setFieldsFromData(
+ int packetLen, byte[] headerData) throws InvalidProtocolBufferException {
+ this.packetLen = packetLen;
+ proto = PacketHeaderProto.parseFrom(headerData);
+ }
+
public void readFields(ByteBuffer buf) throws IOException {
packetLen = buf.getInt();
short protoLen = buf.getShort();
@@ -110,14 +147,21 @@ public class PacketHeader {
proto = PacketHeaderProto.parseFrom(data);
}
+ /**
+ * @return the number of bytes necessary to write out this header,
+ * including the length-prefixing of the payload and header
+ */
+ public int getSerializedSize() {
+ return PKT_LENGTHS_LEN + proto.getSerializedSize();
+ }
/**
* Write the header into the buffer.
* This requires that PKT_HEADER_LEN bytes are available.
*/
public void putInBuffer(final ByteBuffer buf) {
- assert proto.getSerializedSize() == PROTO_SIZE
- : "Expected " + (PROTO_SIZE) + " got: " + proto.getSerializedSize();
+ assert proto.getSerializedSize() <= MAX_PROTO_SIZE
+ : "Expected " + (MAX_PROTO_SIZE) + " got: " + proto.getSerializedSize();
try {
buf.putInt(packetLen);
buf.putShort((short) proto.getSerializedSize());
@@ -128,12 +172,18 @@ public class PacketHeader {
}
public void write(DataOutputStream out) throws IOException {
- assert proto.getSerializedSize() == PROTO_SIZE
- : "Expected " + (PROTO_SIZE) + " got: " + proto.getSerializedSize();
+ assert proto.getSerializedSize() <= MAX_PROTO_SIZE
+ : "Expected " + (MAX_PROTO_SIZE) + " got: " + proto.getSerializedSize();
out.writeInt(packetLen);
out.writeShort(proto.getSerializedSize());
proto.writeTo(out);
}
+
+ public byte[] getBytes() {
+ ByteBuffer buf = ByteBuffer.allocate(getSerializedSize());
+ putInBuffer(buf);
+ return buf.array();
+ }
/**
* Perform a sanity check on the packet, returning true if it is sane.
Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java Fri Oct 19 02:25:55 2012
@@ -30,6 +30,8 @@ import org.apache.hadoop.hdfs.protocol.p
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
+import com.google.protobuf.TextFormat;
+
/** Pipeline Acknowledgment **/
@InterfaceAudience.Private
@InterfaceStability.Evolving
@@ -120,6 +122,6 @@ public class PipelineAck {
@Override //Object
public String toString() {
- return proto.toString();
+ return TextFormat.shortDebugString(proto);
}
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java Fri Oct 19 02:25:55 2012
@@ -38,10 +38,10 @@ import org.apache.hadoop.hdfs.protocol.p
@InterfaceAudience.Private
@InterfaceStability.Evolving
public abstract class Receiver implements DataTransferProtocol {
- protected final DataInputStream in;
-
- /** Create a receiver for DataTransferProtocol with a socket. */
- protected Receiver(final DataInputStream in) {
+ protected DataInputStream in;
+
+ /** Initialize a receiver for DataTransferProtocol with a socket. */
+ protected void initialize(final DataInputStream in) {
this.in = in;
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java Fri Oct 19 02:25:55 2012
@@ -18,19 +18,31 @@
package org.apache.hadoop.hdfs.protocolPB;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DeleteBlockPoolRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DeleteBlockPoolResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBlockLocalPathInfoRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBlockLocalPathInfoResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetHdfsBlockLocationsRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetHdfsBlockLocationsResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetHdfsBlockLocationsResponseProto.Builder;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReplicaVisibleLengthRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReplicaVisibleLengthResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.RefreshNamenodesRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.RefreshNamenodesResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockTokenIdentifierProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.security.token.Token;
+import com.google.protobuf.ByteString;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
@@ -106,4 +118,38 @@ public class ClientDatanodeProtocolServe
.setLocalPath(resp.getBlockPath()).setLocalMetaPath(resp.getMetaPath())
.build();
}
+
+ @Override
+ public GetHdfsBlockLocationsResponseProto getHdfsBlockLocations(
+ RpcController controller, GetHdfsBlockLocationsRequestProto request)
+ throws ServiceException {
+ HdfsBlocksMetadata resp;
+ try {
+ // Construct the Lists to make the actual call
+ List<ExtendedBlock> blocks =
+ new ArrayList<ExtendedBlock>(request.getBlocksCount());
+ for (ExtendedBlockProto b : request.getBlocksList()) {
+ blocks.add(PBHelper.convert(b));
+ }
+ List<Token<BlockTokenIdentifier>> tokens =
+ new ArrayList<Token<BlockTokenIdentifier>>(request.getTokensCount());
+ for (BlockTokenIdentifierProto b : request.getTokensList()) {
+ tokens.add(PBHelper.convert(b));
+ }
+ // Call the real implementation
+ resp = impl.getHdfsBlocksMetadata(blocks, tokens);
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ List<ByteString> volumeIdsByteStrings =
+ new ArrayList<ByteString>(resp.getVolumeIds().size());
+ for (byte[] b : resp.getVolumeIds()) {
+ volumeIdsByteStrings.add(ByteString.copyFrom(b));
+ }
+ // Build and return the response
+ Builder builder = GetHdfsBlockLocationsResponseProto.newBuilder();
+ builder.addAllVolumeIds(volumeIdsByteStrings);
+ builder.addAllVolumeIndexes(resp.getVolumeIndexes());
+ return builder.build();
+ }
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java Fri Oct 19 02:25:55 2012
@@ -20,6 +20,8 @@ package org.apache.hadoop.hdfs.protocolP
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
import javax.net.SocketFactory;
@@ -33,12 +35,17 @@ import org.apache.hadoop.hdfs.protocol.B
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DeleteBlockPoolRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBlockLocalPathInfoRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetBlockLocalPathInfoResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetHdfsBlockLocationsRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetHdfsBlockLocationsResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReplicaVisibleLengthRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.RefreshNamenodesRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockTokenIdentifierProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
@@ -50,6 +57,7 @@ import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
+import com.google.protobuf.ByteString;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
@@ -73,10 +81,10 @@ public class ClientDatanodeProtocolTrans
RefreshNamenodesRequestProto.newBuilder().build();
public ClientDatanodeProtocolTranslatorPB(DatanodeID datanodeid,
- Configuration conf, int socketTimeout, LocatedBlock locatedBlock)
- throws IOException {
+ Configuration conf, int socketTimeout, boolean connectToDnViaHostname,
+ LocatedBlock locatedBlock) throws IOException {
rpcProxy = createClientDatanodeProtocolProxy( datanodeid, conf,
- socketTimeout, locatedBlock);
+ socketTimeout, connectToDnViaHostname, locatedBlock);
}
public ClientDatanodeProtocolTranslatorPB(InetSocketAddress addr,
@@ -90,11 +98,17 @@ public class ClientDatanodeProtocolTrans
* @param datanodeid Datanode to connect to.
* @param conf Configuration.
* @param socketTimeout Socket timeout to use.
+ * @param connectToDnViaHostname connect to the Datanode using its hostname
* @throws IOException
*/
public ClientDatanodeProtocolTranslatorPB(DatanodeID datanodeid,
- Configuration conf, int socketTimeout) throws IOException {
- InetSocketAddress addr = NetUtils.createSocketAddr(datanodeid.getIpcAddr());
+ Configuration conf, int socketTimeout, boolean connectToDnViaHostname)
+ throws IOException {
+ final String dnAddr = datanodeid.getIpcAddr(connectToDnViaHostname);
+ InetSocketAddress addr = NetUtils.createSocketAddr(dnAddr);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Connecting to datanode " + dnAddr + " addr=" + addr);
+ }
rpcProxy = createClientDatanodeProtocolProxy(addr,
UserGroupInformation.getCurrentUser(), conf,
NetUtils.getDefaultSocketFactory(conf), socketTimeout);
@@ -102,10 +116,11 @@ public class ClientDatanodeProtocolTrans
static ClientDatanodeProtocolPB createClientDatanodeProtocolProxy(
DatanodeID datanodeid, Configuration conf, int socketTimeout,
- LocatedBlock locatedBlock) throws IOException {
- InetSocketAddress addr = NetUtils.createSocketAddr(datanodeid.getIpcAddr());
+ boolean connectToDnViaHostname, LocatedBlock locatedBlock) throws IOException {
+ final String dnAddr = datanodeid.getIpcAddr(connectToDnViaHostname);
+ InetSocketAddress addr = NetUtils.createSocketAddr(dnAddr);
if (LOG.isDebugEnabled()) {
- LOG.debug("ClientDatanodeProtocol addr=" + addr);
+ LOG.debug("Connecting to datanode " + dnAddr + " addr=" + addr);
}
// Since we're creating a new UserGroupInformation here, we know that no
@@ -200,4 +215,44 @@ public class ClientDatanodeProtocolTrans
public Object getUnderlyingProxyObject() {
return rpcProxy;
}
-}
\ No newline at end of file
+
+ @Override
+ public HdfsBlocksMetadata getHdfsBlocksMetadata(List<ExtendedBlock> blocks,
+ List<Token<BlockTokenIdentifier>> tokens) throws IOException {
+ // Convert to proto objects
+ List<ExtendedBlockProto> blocksProtos =
+ new ArrayList<ExtendedBlockProto>(blocks.size());
+ List<BlockTokenIdentifierProto> tokensProtos =
+ new ArrayList<BlockTokenIdentifierProto>(tokens.size());
+ for (ExtendedBlock b : blocks) {
+ blocksProtos.add(PBHelper.convert(b));
+ }
+ for (Token<BlockTokenIdentifier> t : tokens) {
+ tokensProtos.add(PBHelper.convert(t));
+ }
+ // Build the request
+ GetHdfsBlockLocationsRequestProto request =
+ GetHdfsBlockLocationsRequestProto.newBuilder()
+ .addAllBlocks(blocksProtos)
+ .addAllTokens(tokensProtos)
+ .build();
+ // Send the RPC
+ GetHdfsBlockLocationsResponseProto response;
+ try {
+ response = rpcProxy.getHdfsBlockLocations(NULL_CONTROLLER, request);
+ } catch (ServiceException e) {
+ throw ProtobufHelper.getRemoteException(e);
+ }
+ // List of volumes in the response
+ List<ByteString> volumeIdsByteStrings = response.getVolumeIdsList();
+ List<byte[]> volumeIds = new ArrayList<byte[]>(volumeIdsByteStrings.size());
+ for (ByteString bs : volumeIdsByteStrings) {
+ volumeIds.add(bs.toByteArray());
+ }
+ // Array of indexes into the list of volumes, one per block
+ List<Integer> volumeIndexes = response.getVolumeIndexesList();
+ // Parsed HdfsVolumeId values, one per block
+ return new HdfsBlocksMetadata(blocks.toArray(new ExtendedBlock[] {}),
+ volumeIds, volumeIndexes);
+ }
+}
Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java Fri Oct 19 02:25:55 2012
@@ -50,14 +50,14 @@ import org.apache.hadoop.hdfs.protocol.p
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateSymlinkResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.DeleteRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.DeleteResponseProto;
-import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.DistributedUpgradeProgressRequestProto;
-import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.DistributedUpgradeProgressResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.FinalizeUpgradeRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.FinalizeUpgradeResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.FsyncRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.FsyncResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetAdditionalDatanodeRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetAdditionalDatanodeResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDataEncryptionKeyRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDataEncryptionKeyResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsResponseProto.Builder;
@@ -103,6 +103,8 @@ import org.apache.hadoop.hdfs.protocol.p
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ReportBadBlocksResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RestoreFailedStorageRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RestoreFailedStorageResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RollEditsRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RollEditsResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SaveNamespaceRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SaveNamespaceResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetBalancerBandwidthRequestProto;
@@ -127,7 +129,7 @@ import org.apache.hadoop.hdfs.protocol.p
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeIDProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto;
-import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.io.Text;
import com.google.protobuf.RpcController;
@@ -537,6 +539,20 @@ public class ClientNamenodeProtocolServe
}
}
+
+ @Override
+ public RollEditsResponseProto rollEdits(RpcController controller,
+ RollEditsRequestProto request) throws ServiceException {
+ try {
+ long txid = server.rollEdits();
+ return RollEditsResponseProto.newBuilder()
+ .setNewSegmentTxId(txid)
+ .build();
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ }
+
static final RefreshNodesResponseProto VOID_REFRESHNODES_RESPONSE =
RefreshNodesResponseProto.newBuilder().build();
@@ -568,24 +584,6 @@ public class ClientNamenodeProtocolServe
}
@Override
- public DistributedUpgradeProgressResponseProto distributedUpgradeProgress(
- RpcController controller, DistributedUpgradeProgressRequestProto req)
- throws ServiceException {
- try {
- UpgradeStatusReport result = server.distributedUpgradeProgress(PBHelper
- .convert(req.getAction()));
- DistributedUpgradeProgressResponseProto.Builder builder =
- DistributedUpgradeProgressResponseProto.newBuilder();
- if (result != null) {
- builder.setReport(PBHelper.convert(result));
- }
- return builder.build();
- } catch (IOException e) {
- throw new ServiceException(e);
- }
- }
-
- @Override
public ListCorruptFileBlocksResponseProto listCorruptFileBlocks(
RpcController controller, ListCorruptFileBlocksRequestProto req)
throws ServiceException {
@@ -830,4 +828,18 @@ public class ClientNamenodeProtocolServe
throw new ServiceException(e);
}
}
+
+ @Override
+ public GetDataEncryptionKeyResponseProto getDataEncryptionKey(
+ RpcController controller, GetDataEncryptionKeyRequestProto request)
+ throws ServiceException {
+ try {
+ DataEncryptionKey encryptionKey = server.getDataEncryptionKey();
+ return GetDataEncryptionKeyResponseProto.newBuilder()
+ .setDataEncryptionKey(PBHelper.convert(encryptionKey))
+ .build();
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ }
}