You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2011/06/10 02:11:33 UTC
svn commit: r1134137 [1/2] - in /hadoop/hdfs/branches/HDFS-1073: ./
src/c++/libhdfs/ src/contrib/hdfsproxy/ src/java/
src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/
src/java/org/apache/hadoop/hdfs/server/common/ src/java/org...
Author: todd
Date: Fri Jun 10 00:11:32 2011
New Revision: 1134137
URL: http://svn.apache.org/viewvc?rev=1134137&view=rev
Log:
Merge trunk into HDFS-1073. Needs a few edits post-merge due to inclusion of HDFS-2003.
Added:
hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/SocketCache.java
- copied unchanged from r1134136, hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/SocketCache.java
hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
- copied unchanged from r1134136, hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestConnCache.java
- copied unchanged from r1134136, hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestConnCache.java
hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSAddressConfig.java
- copied unchanged from r1134136, hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSAddressConfig.java
hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestParallelRead.java
- copied unchanged from r1134136, hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestParallelRead.java
Modified:
hadoop/hdfs/branches/HDFS-1073/ (props changed)
hadoop/hdfs/branches/HDFS-1073/CHANGES.txt
hadoop/hdfs/branches/HDFS-1073/build.xml
hadoop/hdfs/branches/HDFS-1073/src/c++/libhdfs/ (props changed)
hadoop/hdfs/branches/HDFS-1073/src/c++/libhdfs/configure.ac
hadoop/hdfs/branches/HDFS-1073/src/contrib/hdfsproxy/ (props changed)
hadoop/hdfs/branches/HDFS-1073/src/java/ (props changed)
hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/BlockReader.java
hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DFSClient.java
hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DFSInputStream.java
hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java
hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/ (props changed)
hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/BlockReaderTestUtil.java
hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/DataNodeCluster.java
hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java
hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestClientBlockVerification.java
hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java
hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery2.java
hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java
hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java
hadoop/hdfs/branches/HDFS-1073/src/webapps/datanode/ (props changed)
hadoop/hdfs/branches/HDFS-1073/src/webapps/hdfs/ (props changed)
hadoop/hdfs/branches/HDFS-1073/src/webapps/secondary/ (props changed)
Propchange: hadoop/hdfs/branches/HDFS-1073/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Jun 10 00:11:32 2011
@@ -2,4 +2,4 @@
/hadoop/hdfs/branches/HDFS-1052:987665-1095512
/hadoop/hdfs/branches/HDFS-265:796829-820463
/hadoop/hdfs/branches/branch-0.21:820487
-/hadoop/hdfs/trunk:1086482-1132839
+/hadoop/hdfs/trunk:1086482-1134136
Modified: hadoop/hdfs/branches/HDFS-1073/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/CHANGES.txt?rev=1134137&r1=1134136&r2=1134137&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/CHANGES.txt (original)
+++ hadoop/hdfs/branches/HDFS-1073/CHANGES.txt Fri Jun 10 00:11:32 2011
@@ -287,6 +287,9 @@ Trunk (unreleased changes)
IMPROVEMENTS
+ HDFS-1875. MiniDFSCluster hard-codes dfs.datanode.address to localhost
+ (Eric Payne via mattf)
+
HDFS-2019. Fix all the places where Java method File.list is used with
FileUtil.list API (Bharath Mundlapudi via mattf)
@@ -390,6 +393,7 @@ Trunk (unreleased changes)
HDFS-1295. Improve namenode restart times by short-circuiting the
first block reports from datanodes. (Matt Foley via suresh)
+ Corrected merge error in DataNode.java. (Matt Foley)
HDFS-1843. Discover file not found early for file append.
(Bharath Mundlapudi via jitendra)
@@ -485,6 +489,14 @@ Trunk (unreleased changes)
HDFS-2029. In TestWriteRead, check visible length immediately after
openning the file and fix code style. (John George via szetszwo)
+ HDFS-2040. Only build libhdfs if a flag is passed. (eli)
+
+ HDFS-1586. Add InterfaceAudience and InterfaceStability annotations to
+ MiniDFSCluster. (suresh)
+
+ HDFS-2003. Separate FSEditLog reading logic from edit log memory state
+ building logic. (Ivan Kelly via todd)
+
OPTIMIZATIONS
HDFS-1458. Improve checkpoint performance by avoiding unnecessary image
@@ -495,6 +507,9 @@ Trunk (unreleased changes)
HDFS-1826. NameNode should save image to name directories in parallel
during upgrade. (Matt Foley via hairong)
+ HDFS-941. The DFS client should cache and reuse open sockets to datanodes
+ while performing reads. (bc Wong and Todd Lipcon via todd)
+
BUG FIXES
HDFS-1449. Fix test failures - ExtendedBlock must return
@@ -942,6 +957,11 @@ Release 0.22.0 - Unreleased
HDFS-1980. Move build/webapps deeper in the build directory heirarchy
to aid eclipse users. (todd)
+ HDFS-1619. Remove AC_TYPE* from the libhdfs. (Roman Shaposhnik via eli)
+
+ HDFS-1948 Forward port 'hdfs-1520 lightweight namenode operation to
+ trigger lease recovery' (stack)
+
OPTIMIZATIONS
HDFS-1140. Speedup INode.getPathComponents. (Dmytro Molkov via shv)
Modified: hadoop/hdfs/branches/HDFS-1073/build.xml
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/build.xml?rev=1134137&r1=1134136&r2=1134137&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/build.xml (original)
+++ hadoop/hdfs/branches/HDFS-1073/build.xml Fri Jun 10 00:11:32 2011
@@ -326,10 +326,6 @@
</target>
- <target name="set-c++-libhdfs">
- <property name="libhdfs" value="true"/>
- </target>
-
<import file="${test.src.dir}/aop/build/aop.xml"/>
<target name="compile-hdfs-classes" depends="init">
@@ -1102,7 +1098,7 @@
</macro_tar>
</target>
- <target name="bin-package" depends="set-c++-libhdfs, compile, compile-c++-libhdfs, jar, jar-test, ant-tasks, jsvc"
+ <target name="bin-package" depends="compile, compile-c++-libhdfs, jar, jar-test, ant-tasks, jsvc"
description="assembles artifacts for binary target">
<mkdir dir="${dist.dir}"/>
<mkdir dir="${dist.dir}/lib"/>
@@ -1118,7 +1114,7 @@
</copy>
<copy todir="${dist.dir}/lib" includeEmptyDirs="false">
- <fileset dir="${build.dir}/c++/${build.platform}/lib">
+ <fileset dir="${build.dir}/c++/${build.platform}/lib" erroronmissingdir="false">
<include name="**"/>
</fileset>
</copy>
Propchange: hadoop/hdfs/branches/HDFS-1073/src/c++/libhdfs/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Jun 10 00:11:32 2011
@@ -1,4 +1,4 @@
/hadoop/core/branches/branch-0.19/mapred/src/c++/libhdfs:713112
/hadoop/core/trunk/src/c++/libhdfs:776175-784663
/hadoop/hdfs/branches/HDFS-1052/src/c++/libhdfs:987665-1095512
-/hadoop/hdfs/trunk/src/c++/libhdfs:1086482-1132839
+/hadoop/hdfs/trunk/src/c++/libhdfs:1086482-1134136
Modified: hadoop/hdfs/branches/HDFS-1073/src/c++/libhdfs/configure.ac
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/c%2B%2B/libhdfs/configure.ac?rev=1134137&r1=1134136&r2=1134137&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/c++/libhdfs/configure.ac (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/c++/libhdfs/configure.ac Fri Jun 10 00:11:32 2011
@@ -18,7 +18,6 @@
# Autoconf input file
# $Id$
-AC_PREREQ(2.61)
AC_INIT([libhdfs], [0.1.0], omalley@apache.org)
AC_PREFIX_DEFAULT([`pwd`/../install])
AC_CONFIG_AUX_DIR([config])
@@ -122,9 +121,5 @@ AC_C_CONST
AC_C_VOLATILE
#AC_FUNC_MALLOC
AC_HEADER_STDBOOL
-AC_TYPE_INT16_T
-AC_TYPE_INT32_T
-AC_TYPE_INT64_T
-AC_TYPE_UINT16_T
AC_SUBST(PRODUCT_MK)
AC_OUTPUT
Propchange: hadoop/hdfs/branches/HDFS-1073/src/contrib/hdfsproxy/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Jun 10 00:11:32 2011
@@ -3,4 +3,4 @@
/hadoop/hdfs/branches/HDFS-1052/src/contrib/hdfsproxy:987665-1095512
/hadoop/hdfs/branches/HDFS-265/src/contrib/hdfsproxy:796829-820463
/hadoop/hdfs/branches/branch-0.21/src/contrib/hdfsproxy:820487
-/hadoop/hdfs/trunk/src/contrib/hdfsproxy:1086482-1132839
+/hadoop/hdfs/trunk/src/contrib/hdfsproxy:1086482-1134136
Propchange: hadoop/hdfs/branches/HDFS-1073/src/java/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Jun 10 00:11:32 2011
@@ -3,4 +3,4 @@
/hadoop/hdfs/branches/HDFS-1052/src/java:987665-1095512
/hadoop/hdfs/branches/HDFS-265/src/java:796829-820463
/hadoop/hdfs/branches/branch-0.21/src/java:820487
-/hadoop/hdfs/trunk/src/java:1086482-1132839
+/hadoop/hdfs/trunk/src/java:1086482-1134136
Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/BlockReader.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/BlockReader.java?rev=1134137&r1=1134136&r2=1134137&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/BlockReader.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/BlockReader.java Fri Jun 10 00:11:32 2011
@@ -46,12 +46,29 @@ import org.apache.hadoop.security.token.
import org.apache.hadoop.util.DataChecksum;
/** This is a wrapper around connection to datanode
- * and understands checksum, offset etc
+ * and understands checksum, offset etc.
+ *
+ * Terminology:
+ * <dl>
+ * <dt>block</dt>
+ * <dd>The hdfs block, typically large (~64MB).
+ * </dd>
+ * <dt>chunk</dt>
+ * <dd>A block is divided into chunks, each comes with a checksum.
+ * We want transfers to be chunk-aligned, to be able to
+ * verify checksums.
+ * </dd>
+ * <dt>packet</dt>
+ * <dd>A grouping of chunks used for transport. It contains a
+ * header, followed by checksum data, followed by real data.
+ * </dd>
+ * </dl>
+ * Please see DataNode for the RPC specification.
*/
@InterfaceAudience.Private
public class BlockReader extends FSInputChecker {
- Socket dnSock; //for now just sending checksumOk.
+ Socket dnSock; //for now just sending the status code (e.g. checksumOk) after the read.
private DataInputStream in;
private DataChecksum checksum;
@@ -77,10 +94,12 @@ public class BlockReader extends FSInput
*/
private final long bytesNeededToFinish;
- private boolean gotEOS = false;
+ private boolean eos = false;
+ private boolean sentStatusCode = false;
byte[] skipBuf = null;
ByteBuffer checksumBytes = null;
+ /** Amount of unread data in the current received packet */
int dataLeft = 0;
/* FSInputChecker interface */
@@ -99,7 +118,7 @@ public class BlockReader extends FSInput
// This has to be set here, *before* the skip, since we can
// hit EOS during the skip, in the case that our entire read
// is smaller than the checksum chunk.
- boolean eosBefore = gotEOS;
+ boolean eosBefore = eos;
//for the first read, skip the extra bytes at the front.
if (lastChunkLen < 0 && startOffset > firstChunkOffset && len > 0) {
@@ -115,11 +134,14 @@ public class BlockReader extends FSInput
}
int nRead = super.read(buf, off, len);
-
- // if gotEOS was set in the previous read and checksum is enabled :
- if (gotEOS && !eosBefore && nRead >= 0 && needChecksum()) {
- //checksum is verified and there are no errors.
- checksumOk(dnSock);
+
+ // if eos was set in the previous read, send a status code to the DN
+ if (eos && !eosBefore && nRead >= 0) {
+ if (needChecksum()) {
+ sendReadResult(dnSock, CHECKSUM_OK);
+ } else {
+ sendReadResult(dnSock, SUCCESS);
+ }
}
return nRead;
}
@@ -191,7 +213,7 @@ public class BlockReader extends FSInput
int len, byte[] checksumBuf)
throws IOException {
// Read one chunk.
- if ( gotEOS ) {
+ if (eos) {
// Already hit EOF
return -1;
}
@@ -246,7 +268,7 @@ public class BlockReader extends FSInput
if (checksumSize > 0) {
- // How many chunks left in our stream - this is a ceiling
+ // How many chunks left in our packet - this is a ceiling
// since we may have a partial chunk at the end of the file
int chunksLeft = (dataLeft - 1) / bytesPerChecksum + 1;
@@ -307,7 +329,7 @@ public class BlockReader extends FSInput
", dataLen : " + dataLen);
}
- gotEOS = true;
+ eos = true;
}
if ( bytesToRead == 0 ) {
@@ -335,7 +357,7 @@ public class BlockReader extends FSInput
// The total number of bytes that we need to transfer from the DN is
// the amount that the user wants (bytesToRead), plus the padding at
// the beginning in order to chunk-align. Note that the DN may elect
- // to send more than this amount if the read ends mid-chunk.
+ // to send more than this amount if the read starts/ends mid-chunk.
this.bytesNeededToFinish = bytesToRead + (startOffset - firstChunkOffset);
this.firstChunkOffset = firstChunkOffset;
@@ -364,6 +386,21 @@ public class BlockReader extends FSInput
len, bufferSize, verifyChecksum, "");
}
+ /**
+ * Create a new BlockReader specifically to satisfy a read.
+ * This method also sends the OP_READ_BLOCK request.
+ *
+ * @param sock An established Socket to the DN. The BlockReader will not close it normally
+ * @param file File location
+ * @param block The block object
+ * @param blockToken The block token for security
+ * @param startOffset The read offset, relative to block head
+ * @param len The number of bytes to read
+ * @param bufferSize The IO buffer size (not the client buffer size)
+ * @param verifyChecksum Whether to verify checksum
+ * @param clientName Client name
+ * @return New BlockReader instance, or null on error.
+ */
public static BlockReader newBlockReader( Socket sock, String file,
ExtendedBlock block,
Token<BlockTokenIdentifier> blockToken,
@@ -423,6 +460,10 @@ public class BlockReader extends FSInput
public synchronized void close() throws IOException {
startOffset = -1;
checksum = null;
+ if (dnSock != null) {
+ dnSock.close();
+ }
+
// in will be closed when its Socket is closed.
}
@@ -432,22 +473,43 @@ public class BlockReader extends FSInput
public int readAll(byte[] buf, int offset, int len) throws IOException {
return readFully(this, buf, offset, len);
}
-
- /* When the reader reaches end of the read and there are no checksum
- * errors, we send OP_STATUS_CHECKSUM_OK to datanode to inform that
- * checksum was verified and there was no error.
- */
- void checksumOk(Socket sock) {
+
+ /**
+ * Take the socket used to talk to the DN.
+ */
+ public Socket takeSocket() {
+ assert hasSentStatusCode() :
+ "BlockReader shouldn't give back sockets mid-read";
+ Socket res = dnSock;
+ dnSock = null;
+ return res;
+ }
+
+ /**
+ * Whether the BlockReader has reached the end of its input stream
+ * and successfully sent a status code back to the datanode.
+ */
+ public boolean hasSentStatusCode() {
+ return sentStatusCode;
+ }
+
+ /**
+ * When the reader reaches end of the read, it sends a status response
+ * (e.g. CHECKSUM_OK) to the DN. Failure to do so could lead to the DN
+ * closing our connection (which we will re-open), but won't affect
+ * data correctness.
+ */
+ void sendReadResult(Socket sock, DataTransferProtocol.Status statusCode) {
+ assert !sentStatusCode : "already sent status code to " + sock;
try {
OutputStream out = NetUtils.getOutputStream(sock, HdfsConstants.WRITE_TIMEOUT);
- CHECKSUM_OK.writeOutputStream(out);
+ statusCode.writeOutputStream(out);
out.flush();
+ sentStatusCode = true;
} catch (IOException e) {
- // its ok not to be able to send this.
- if(LOG.isDebugEnabled()) {
- LOG.debug("Could not write to datanode " + sock.getInetAddress() +
- ": " + e.getMessage());
- }
+ // It's ok not to be able to send this. But something is probably wrong.
+ LOG.info("Could not send read status (" + statusCode + ") to datanode " +
+ sock.getInetAddress() + ": " + e.getMessage());
}
}
Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1134137&r1=1134136&r2=1134137&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DFSClient.java Fri Jun 10 00:11:32 2011
@@ -136,6 +136,8 @@ public class DFSClient implements FSCons
final int hdfsTimeout; // timeout value for a DFS operation.
final LeaseRenewer leaserenewer;
+ final SocketCache socketCache;
+
/**
* A map from file names to {@link DFSOutputStream} objects
* that are currently being written by this client.
@@ -279,6 +281,10 @@ public class DFSClient implements FSCons
defaultReplication = (short)
conf.getInt(DFSConfigKeys.DFS_REPLICATION_KEY,
DFSConfigKeys.DFS_REPLICATION_DEFAULT);
+
+ this.socketCache = new SocketCache(
+ conf.getInt(DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY,
+ DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT));
if (nameNodeAddr != null && rpcNamenode == null) {
this.rpcNamenode = createRPCNamenode(nameNodeAddr, conf, ugi);
@@ -539,6 +545,23 @@ public class DFSClient implements FSCons
}
/**
+ * Recover a file's lease
+ * @param src a file's path
+ * @return true if the file is already closed
+ * @throws IOException
+ */
+ boolean recoverLease(String src) throws IOException {
+ checkOpen();
+
+ try {
+ return namenode.recoverLease(src, clientName);
+ } catch (RemoteException re) {
+ throw re.unwrapRemoteException(FileNotFoundException.class,
+ AccessControlException.class);
+ }
+ }
+
+ /**
* Get block location info about file
*
* getBlockLocations() returns a list of hostnames that store
Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1134137&r1=1134136&r2=1134137&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Fri Jun 10 00:11:32 2011
@@ -44,6 +44,8 @@ public class DFSConfigKeys extends Commo
public static final boolean DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_DEFAULT = true;
public static final String DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_KEY = "dfs.client.block.write.replace-datanode-on-failure.policy";
public static final String DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_DEFAULT = "DEFAULT";
+ public static final String DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY = "dfs.client.socketcache.capacity";
+ public static final int DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT = 16;
public static final String DFS_NAMENODE_BACKUP_ADDRESS_KEY = "dfs.namenode.backup.address";
public static final String DFS_NAMENODE_BACKUP_ADDRESS_DEFAULT = "localhost:50100";
@@ -83,6 +85,8 @@ public class DFSConfigKeys extends Commo
public static final String DFS_CLIENT_HTTPS_KEYSTORE_RESOURCE_DEFAULT = "ssl-client.xml";
public static final String DFS_CLIENT_HTTPS_NEED_AUTH_KEY = "dfs.client.https.need-auth";
public static final boolean DFS_CLIENT_HTTPS_NEED_AUTH_DEFAULT = false;
+ public static final String DFS_CLIENT_CACHED_CONN_RETRY_KEY = "dfs.client.cached.conn.retry";
+ public static final int DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT = 3;
public static final String DFS_NAMENODE_ACCESSTIME_PRECISION_KEY = "dfs.namenode.accesstime.precision";
public static final long DFS_NAMENODE_ACCESSTIME_PRECISION_DEFAULT = 3600000;
public static final String DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY = "dfs.namenode.replication.considerLoad";
@@ -115,6 +119,8 @@ public class DFSConfigKeys extends Commo
public static final int DFS_DATANODE_FAILED_VOLUMES_TOLERATED_DEFAULT = 0;
public static final String DFS_DATANODE_SYNCONCLOSE_KEY = "dfs.datanode.synconclose";
public static final boolean DFS_DATANODE_SYNCONCLOSE_DEFAULT = false;
+ public static final String DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_KEY = "dfs.datanode.socket.reuse.keepalive";
+ public static final int DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_DEFAULT = 1000;
//Delegation token related keys
public static final String DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_KEY = "dfs.namenode.delegation.key.update-interval";
Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DFSInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DFSInputStream.java?rev=1134137&r1=1134136&r2=1134137&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DFSInputStream.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DFSInputStream.java Fri Jun 10 00:11:32 2011
@@ -43,7 +43,6 @@ import org.apache.hadoop.hdfs.protocol.L
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
-import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.NetUtils;
@@ -56,8 +55,9 @@ import org.apache.hadoop.util.StringUtil
****************************************************************/
@InterfaceAudience.Private
public class DFSInputStream extends FSInputStream {
+ private final SocketCache socketCache;
+
private final DFSClient dfsClient;
- private Socket s = null;
private boolean closed = false;
private final String src;
@@ -92,7 +92,9 @@ public class DFSInputStream extends FSIn
private int buffersize = 1;
private byte[] oneByteBuf = new byte[1]; // used for 'int read()'
-
+
+ private int nCachedConnRetry;
+
void addToDeadNodes(DatanodeInfo dnInfo) {
deadNodes.put(dnInfo, dnInfo);
}
@@ -103,9 +105,14 @@ public class DFSInputStream extends FSIn
this.verifyChecksum = verifyChecksum;
this.buffersize = buffersize;
this.src = src;
+ this.socketCache = dfsClient.socketCache;
prefetchSize = this.dfsClient.conf.getLong(DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY,
10 * dfsClient.defaultBlockSize);
- timeWindow = this.dfsClient.conf.getInt(DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE, timeWindow);
+ timeWindow = this.dfsClient.conf.getInt(
+ DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE, timeWindow);
+ nCachedConnRetry = this.dfsClient.conf.getInt(
+ DFSConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_KEY,
+ DFSConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT);
openInfo();
}
@@ -371,15 +378,11 @@ public class DFSInputStream extends FSIn
throw new IOException("Attempted to read past end of file");
}
- if ( blockReader != null ) {
- blockReader.close();
+ // Will be getting a new BlockReader.
+ if (blockReader != null) {
+ closeBlockReader(blockReader);
blockReader = null;
}
-
- if (s != null) {
- s.close();
- s = null;
- }
//
// Connect to best DataNode for desired Block, with potential offset
@@ -400,14 +403,12 @@ public class DFSInputStream extends FSIn
InetSocketAddress targetAddr = retval.addr;
try {
- s = dfsClient.socketFactory.createSocket();
- NetUtils.connect(s, targetAddr, dfsClient.socketTimeout);
- s.setSoTimeout(dfsClient.socketTimeout);
ExtendedBlock blk = targetBlock.getBlock();
Token<BlockTokenIdentifier> accessToken = targetBlock.getBlockToken();
- blockReader = BlockReader.newBlockReader(s, src, blk,
- accessToken,
+ blockReader = getBlockReader(
+ targetAddr, src, blk,
+ accessToken,
offsetIntoBlock, blk.getNumBytes() - offsetIntoBlock,
buffersize, verifyChecksum, dfsClient.clientName);
return chosenNode;
@@ -437,13 +438,6 @@ public class DFSInputStream extends FSIn
// Put chosen node into dead list, continue
addToDeadNodes(chosenNode);
}
- if (s != null) {
- try {
- s.close();
- } catch (IOException iex) {
- }
- }
- s = null;
}
}
}
@@ -457,16 +451,11 @@ public class DFSInputStream extends FSIn
return;
}
dfsClient.checkOpen();
-
- if ( blockReader != null ) {
- blockReader.close();
+
+ if (blockReader != null) {
+ closeBlockReader(blockReader);
blockReader = null;
}
-
- if (s != null) {
- s.close();
- s = null;
- }
super.close();
closed = true;
}
@@ -479,7 +468,7 @@ public class DFSInputStream extends FSIn
/* This is a used by regular read() and handles ChecksumExceptions.
* name readBuffer() is chosen to imply similarity to readBuffer() in
- * ChecksuFileSystem
+ * ChecksumFileSystem
*/
private synchronized int readBuffer(byte buf[], int off, int len,
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
@@ -659,7 +648,6 @@ public class DFSInputStream extends FSIn
//
// Connect to best DataNode for desired Block, with potential offset
//
- Socket dn = null;
int refetchToken = 1; // only need to get a new access token once
while (true) {
@@ -673,18 +661,15 @@ public class DFSInputStream extends FSIn
BlockReader reader = null;
try {
- dn = dfsClient.socketFactory.createSocket();
- NetUtils.connect(dn, targetAddr, dfsClient.socketTimeout);
- dn.setSoTimeout(dfsClient.socketTimeout);
Token<BlockTokenIdentifier> blockToken = block.getBlockToken();
int len = (int) (end - start + 1);
-
- reader = BlockReader.newBlockReader(dn, src,
- block.getBlock(),
- blockToken,
- start, len, buffersize,
- verifyChecksum, dfsClient.clientName);
+
+ reader = getBlockReader(targetAddr, src,
+ block.getBlock(),
+ blockToken,
+ start, len, buffersize,
+ verifyChecksum, dfsClient.clientName);
int nread = reader.readAll(buf, offset, len);
if (nread != len) {
throw new IOException("truncated return from reader.read(): " +
@@ -713,8 +698,9 @@ public class DFSInputStream extends FSIn
}
}
} finally {
- IOUtils.closeStream(reader);
- IOUtils.closeSocket(dn);
+ if (reader != null) {
+ closeBlockReader(reader);
+ }
}
// Put chosen node into dead list, continue
addToDeadNodes(chosenNode);
@@ -722,6 +708,95 @@ public class DFSInputStream extends FSIn
}
/**
+ * Close the given BlockReader and cache its socket.
+ */
+ private void closeBlockReader(BlockReader reader) throws IOException {
+ if (reader.hasSentStatusCode()) {
+ Socket oldSock = reader.takeSocket();
+ socketCache.put(oldSock);
+ }
+ reader.close();
+ }
+
+ /**
+ * Retrieve a BlockReader suitable for reading.
+ * This method will reuse the cached connection to the DN if appropriate.
+ * Otherwise, it will create a new connection.
+ *
+ * @param dnAddr Address of the datanode
+ * @param file File location
+ * @param block The Block object
+ * @param blockToken The access token for security
+ * @param startOffset The read offset, relative to block head
+ * @param len The number of bytes to read
+ * @param bufferSize The IO buffer size (not the client buffer size)
+ * @param verifyChecksum Whether to verify checksum
+ * @param clientName Client name
+ * @return New BlockReader instance
+ */
+ protected BlockReader getBlockReader(InetSocketAddress dnAddr,
+ String file,
+ ExtendedBlock block,
+ Token<BlockTokenIdentifier> blockToken,
+ long startOffset,
+ long len,
+ int bufferSize,
+ boolean verifyChecksum,
+ String clientName)
+ throws IOException {
+ IOException err = null;
+ boolean fromCache = true;
+
+ // Allow retry since there is no way of knowing whether the cached socket
+ // is good until we actually use it.
+ for (int retries = 0; retries <= nCachedConnRetry && fromCache; ++retries) {
+ Socket sock = socketCache.get(dnAddr);
+ if (sock == null) {
+ fromCache = false;
+
+ sock = dfsClient.socketFactory.createSocket();
+
+ // TCP_NODELAY is crucial here because of bad interactions between
+ // Nagle's Algorithm and Delayed ACKs. With connection keepalive
+ // between the client and DN, the conversation looks like:
+ // 1. Client -> DN: Read block X
+ // 2. DN -> Client: data for block X
+ // 3. Client -> DN: Status OK (successful read)
+ // 4. Client -> DN: Read block Y
+ // The fact that step #3 and #4 are both in the client->DN direction
+ // triggers Nagling. If the DN is using delayed ACKs, this results
+ // in a delay of 40ms or more.
+ //
+ // TCP_NODELAY disables nagling and thus avoids this performance
+ // disaster.
+ sock.setTcpNoDelay(true);
+
+ NetUtils.connect(sock, dnAddr, dfsClient.socketTimeout);
+ sock.setSoTimeout(dfsClient.socketTimeout);
+ }
+
+ try {
+ // The OP_READ_BLOCK request is sent as we make the BlockReader
+ BlockReader reader =
+ BlockReader.newBlockReader(sock, file, block,
+ blockToken,
+ startOffset, len,
+ bufferSize, verifyChecksum,
+ clientName);
+ return reader;
+ } catch (IOException ex) {
+ // Our socket is no good.
+ DFSClient.LOG.debug("Error making BlockReader. Closing stale " + sock, ex);
+ sock.close();
+ err = ex;
+ }
+ }
+
+ throw err;
+ }
+
+
+ /**
* Read bytes starting from the specified position.
*
* @param position start read from this position
Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java?rev=1134137&r1=1134136&r2=1134137&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java Fri Jun 10 00:11:32 2011
@@ -218,6 +218,17 @@ public class DistributedFileSystem exten
this.verifyChecksum = verifyChecksum;
}
+ /**
+ * Start the lease recovery of a file
+ *
+ * @param f a file
+ * @return true if the file is already closed
+ * @throws IOException if an error occurs
+ */
+ public boolean recoverLease(Path f) throws IOException {
+ return dfs.recoverLease(getPathName(f));
+ }
+
@SuppressWarnings("deprecation")
@Override
public FSDataInputStream open(Path f, int bufferSize) throws IOException {
Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java?rev=1134137&r1=1134136&r2=1134137&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java Fri Jun 10 00:11:32 2011
@@ -537,6 +537,17 @@ public interface ClientProtocol extends
public void renewLease(String clientName) throws AccessControlException,
IOException;
+ /**
+ * Start lease recovery.
+ * Lightweight NameNode operation to trigger lease recovery
+ *
+ * @param src path of the file to start lease recovery
+ * @param clientName name of the current client
+ * @return true if the file is already closed
+ * @throws IOException
+ */
+ public boolean recoverLease(String src, String clientName) throws IOException;
+
public int GET_STATS_CAPACITY_IDX = 0;
public int GET_STATS_USED_IDX = 1;
public int GET_STATS_REMAINING_IDX = 2;
Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java?rev=1134137&r1=1134136&r2=1134137&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java Fri Jun 10 00:11:32 2011
@@ -83,6 +83,7 @@ public interface HdfsConstants {
public static int READ_TIMEOUT_EXTENSION = 5 * 1000;
public static int WRITE_TIMEOUT = 8 * 60 * 1000;
public static int WRITE_TIMEOUT_EXTENSION = 5 * 1000; //for write pipeline
+ public static int DN_KEEPALIVE_TIMEOUT = 5 * 1000;
/**
* Defines the NameNode role.
Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java?rev=1134137&r1=1134136&r2=1134137&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java Fri Jun 10 00:11:32 2011
@@ -69,7 +69,8 @@ class BlockSender implements java.io.Clo
private long seqno; // sequence number of packet
private boolean transferToAllowed = true;
- private boolean blockReadFully; //set when the whole block is read
+ // set once entire requested byte range has been sent to the client
+ private boolean sentEntireByteRange;
private boolean verifyChecksum; //if true, check is verified while reading
private DataTransferThrottler throttler;
private final String clientTraceFmt; // format of client trace log message
@@ -493,6 +494,8 @@ class BlockSender implements java.io.Clo
} catch (IOException e) { //socket error
throw ioeToSocketException(e);
}
+
+ sentEntireByteRange = true;
} finally {
if (clientTraceFmt != null) {
final long endTime = System.nanoTime();
@@ -501,12 +504,10 @@ class BlockSender implements java.io.Clo
close();
}
- blockReadFully = initialOffset == 0 && offset >= replicaVisibleLength;
-
return totalRead;
}
- boolean isBlockReadFully() {
- return blockReadFully;
+ boolean didSendEntireByteRange() {
+ return sentEntireByteRange;
}
}
Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1134137&r1=1134136&r2=1134137&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Fri Jun 10 00:11:32 2011
@@ -912,27 +912,30 @@ public class DataNode extends Configured
* @throws IOException
*/
DatanodeCommand blockReport() throws IOException {
- // send block report
+ // send block report if timer has expired.
DatanodeCommand cmd = null;
long startTime = now();
if (startTime - lastBlockReport > blockReportInterval) {
- //
- // Send latest block report if timer has expired.
- // Get back a list of local block(s) that are obsolete
- // and can be safely GC'ed.
- //
- long brStartTime = now();
+
+ // Create block report
+ long brCreateStartTime = now();
BlockListAsLongs bReport = data.getBlockReport(blockPoolId);
+
+ // Send block report
+ long brSendStartTime = now();
cmd = bpNamenode.blockReport(bpRegistration, blockPoolId, bReport
.getBlockListAsLongs());
- long brTime = now() - brStartTime;
- metrics.addBlockReport(brTime);
- LOG.info("BlockReport of " + bReport.getNumberOfBlocks() +
- " blocks got processed in " + brTime + " msecs");
- //
+
+ // Log the block report processing stats from Datanode perspective
+ long brSendCost = now() - brSendStartTime;
+ long brCreateCost = brSendStartTime - brCreateStartTime;
+ metrics.addBlockReport(brSendCost);
+ LOG.info("BlockReport of " + bReport.getNumberOfBlocks()
+ + " blocks took " + brCreateCost + " msec to generate and "
+ + brSendCost + " msecs for RPC and NN processing");
+
// If we have sent the first block report, then wait a random
// time before we start the periodic block reports.
- //
if (resetBlockReportTime) {
lastBlockReport = startTime - R.nextInt((int)(blockReportInterval));
resetBlockReportTime = false;
@@ -1831,15 +1834,21 @@ public class DataNode extends Configured
A "PACKET" is defined further below.
The client reads data until it receives a packet with
- "LastPacketInBlock" set to true or with a zero length. If there is
- no checksum error, it replies to DataNode with OP_STATUS_CHECKSUM_OK:
-
- Client optional response at the end of data transmission of any length:
- +------------------------------+
- | 2 byte OP_STATUS_CHECKSUM_OK |
- +------------------------------+
- The DataNode always checks OP_STATUS_CHECKSUM_OK. It will close the
- client connection if it is absent.
+ "LastPacketInBlock" set to true or with a zero length. It then replies
+ to DataNode with one of the status codes:
+ - CHECKSUM_OK: All the chunk checksums have been verified
+ - SUCCESS: Data received; checksums not verified
+ - ERROR_CHECKSUM: (Currently not used) Detected invalid checksums
+
+ +---------------+
+ | 2 byte Status |
+ +---------------+
+
+ The DataNode expects all well behaved clients to send the 2 byte
+ status code. And if the the client doesn't, the DN will close the
+ connection. So the status code is optional in the sense that it
+ does not affect the correctness of the data. (And the client can
+ always reconnect.)
PACKET : Contains a packet header, checksum and data. Amount of data
======== carried is set by BUFFER_SIZE.
Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1134137&r1=1134136&r2=1134137&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Fri Jun 10 00:11:32 2011
@@ -27,14 +27,18 @@ import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
+import java.io.EOFException;
import java.io.IOException;
+import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketException;
+import java.nio.channels.ClosedChannelException;
import java.util.Arrays;
import org.apache.commons.logging.Log;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -71,6 +75,7 @@ class DataXceiver extends DataTransferPr
private final DataNode datanode;
private final DataXceiverServer dataXceiverServer;
+ private int socketKeepaliveTimeout;
private long opStartTime; //the start time of receiving an Op
public DataXceiver(Socket s, DataNode datanode,
@@ -83,6 +88,10 @@ class DataXceiver extends DataTransferPr
remoteAddress = s.getRemoteSocketAddress().toString();
localAddress = s.getLocalSocketAddress().toString();
+ socketKeepaliveTimeout = datanode.getConf().getInt(
+ DFSConfigKeys.DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_KEY,
+ DFSConfigKeys.DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_DEFAULT);
+
if (LOG.isDebugEnabled()) {
LOG.debug("Number of active connections is: "
+ datanode.getXceiverCount());
@@ -113,24 +122,60 @@ class DataXceiver extends DataTransferPr
updateCurrentThreadName("Waiting for operation");
DataInputStream in=null;
+ int opsProcessed = 0;
try {
in = new DataInputStream(
new BufferedInputStream(NetUtils.getInputStream(s),
SMALL_BUFFER_SIZE));
- final DataTransferProtocol.Op op = readOp(in);
+ int stdTimeout = s.getSoTimeout();
- // Make sure the xciver count is not exceeded
- int curXceiverCount = datanode.getXceiverCount();
- if (curXceiverCount > dataXceiverServer.maxXceiverCount) {
- throw new IOException("xceiverCount " + curXceiverCount
- + " exceeds the limit of concurrent xcievers "
- + dataXceiverServer.maxXceiverCount);
- }
+ // We process requests in a loop, and stay around for a short timeout.
+ // This optimistic behaviour allows the other end to reuse connections.
+ // Setting keepalive timeout to 0 disable this behavior.
+ do {
+ DataTransferProtocol.Op op;
+ try {
+ if (opsProcessed != 0) {
+ assert socketKeepaliveTimeout > 0;
+ s.setSoTimeout(socketKeepaliveTimeout);
+ }
+ op = readOp(in);
+ } catch (InterruptedIOException ignored) {
+ // Time out while we wait for client rpc
+ break;
+ } catch (IOException err) {
+ // Since we optimistically expect the next op, it's quite normal to get EOF here.
+ if (opsProcessed > 0 &&
+ (err instanceof EOFException || err instanceof ClosedChannelException)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Cached " + s.toString() + " closing after " + opsProcessed + " ops");
+ }
+ } else {
+ throw err;
+ }
+ break;
+ }
+
+ // restore normal timeout
+ if (opsProcessed != 0) {
+ s.setSoTimeout(stdTimeout);
+ }
- opStartTime = now();
- processOp(op, in);
+ // Make sure the xceiver count is not exceeded
+ int curXceiverCount = datanode.getXceiverCount();
+ if (curXceiverCount > dataXceiverServer.maxXceiverCount) {
+ throw new IOException("xceiverCount " + curXceiverCount
+ + " exceeds the limit of concurrent xcievers "
+ + dataXceiverServer.maxXceiverCount);
+ }
+
+ opStartTime = now();
+ processOp(op, in);
+ ++opsProcessed;
+ } while (s.isConnected() && socketKeepaliveTimeout > 0);
} catch (Throwable t) {
- LOG.error(datanode.getMachineName() + ":DataXceiver",t);
+ LOG.error(datanode.getMachineName() + ":DataXceiver, at " +
+ s.toString(), t);
} finally {
if (LOG.isDebugEnabled()) {
LOG.debug(datanode.getMachineName() + ":Number of active connections is: "
@@ -176,18 +221,36 @@ class DataXceiver extends DataTransferPr
blockSender = new BlockSender(block, startOffset, length,
true, true, false, datanode, clientTraceFmt);
} catch(IOException e) {
- ERROR.write(out);
+ sendResponse(s, ERROR, datanode.socketWriteTimeout);
throw e;
}
SUCCESS.write(out); // send op status
long read = blockSender.sendBlock(out, baseStream, null); // send data
-
+
+ if (blockSender.didSendEntireByteRange()) {
+ // If we sent the entire range, then we should expect the client
+ // to respond with a Status enum.
+ try {
+ DataTransferProtocol.Status stat = DataTransferProtocol.Status.read(in);
+ if (stat == null) {
+ LOG.warn("Client " + s.getInetAddress() + " did not send a valid status " +
+ "code after reading. Will close connection.");
+ IOUtils.closeStream(out);
+ }
+ } catch (IOException ioe) {
+ LOG.debug("Error reading client status response. Will close connection.", ioe);
+ IOUtils.closeStream(out);
+ }
+ } else {
+ IOUtils.closeStream(out);
+ }
datanode.metrics.incrBytesRead((int) read);
datanode.metrics.incrBlocksRead();
} catch ( SocketException ignored ) {
// Its ok for remote side to close the connection anytime.
datanode.metrics.incrBlocksRead();
+ IOUtils.closeStream(out);
} catch ( IOException ioe ) {
/* What exactly should we do here?
* Earlier version shutdown() datanode if there is disk error.
@@ -198,7 +261,6 @@ class DataXceiver extends DataTransferPr
StringUtils.stringifyException(ioe) );
throw ioe;
} finally {
- IOUtils.closeStream(out);
IOUtils.closeStream(blockSender);
}
@@ -690,12 +752,8 @@ class DataXceiver extends DataTransferPr
long timeout) throws IOException {
DataOutputStream reply =
new DataOutputStream(NetUtils.getOutputStream(s, timeout));
- try {
- opStatus.write(reply);
- reply.flush();
- } finally {
- IOUtils.closeStream(reply);
- }
+ opStatus.write(reply);
+ reply.flush();
}
private void checkAccess(DataOutputStream out, final boolean reply,
Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java?rev=1134137&r1=1134136&r2=1134137&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java Fri Jun 10 00:11:32 2011
@@ -18,9 +18,7 @@
package org.apache.hadoop.hdfs.server.namenode;
import java.io.BufferedInputStream;
-import java.io.DataInput;
import java.io.DataInputStream;
-import java.io.DataOutput;
import java.io.EOFException;
import java.io.FilterInputStream;
import java.io.IOException;
@@ -29,27 +27,15 @@ import java.util.Arrays;
import java.util.zip.CheckedInputStream;
import java.util.zip.Checksum;
-import org.apache.hadoop.fs.ChecksumException;
-import org.apache.hadoop.fs.Options.Rename;
-import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
-import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
-import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import static org.apache.hadoop.hdfs.server.common.Util.now;
-import org.apache.hadoop.hdfs.server.common.GenerationStamp;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableFactories;
-import org.apache.hadoop.io.WritableFactory;
-import org.apache.hadoop.security.token.delegation.DelegationKey;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.*;
public class FSEditLogLoader {
private final FSNamesystem fsNamesys;
@@ -81,7 +67,7 @@ public class FSEditLogLoader {
*/
int readLogVersion(DataInputStream in) throws IOException {
int logVersion = 0;
- // Read log file version. Could be missing.
+ // Read log file version. Could be missing.
in.mark(4);
// If edits log is greater than 2G, available method will return negative
// numbers, so we avoid having to call available
@@ -97,7 +83,7 @@ public class FSEditLogLoader {
if (logVersion < FSConstants.LAYOUT_VERSION) // future version
throw new IOException(
"Unexpected version of the file system log file: "
- + logVersion + ". Current version = "
+ + logVersion + ". Current version = "
+ FSConstants.LAYOUT_VERSION + ".");
}
assert logVersion <= Storage.LAST_UPGRADABLE_LAYOUT_VERSION :
@@ -110,7 +96,7 @@ public class FSEditLogLoader {
throws IOException {
BufferedInputStream bin = new BufferedInputStream(edits);
DataInputStream in = new DataInputStream(bin);
-
+
int numEdits = 0;
int logVersion = 0;
@@ -139,9 +125,7 @@ public class FSEditLogLoader {
throws IOException {
FSDirectory fsDir = fsNamesys.dir;
int numEdits = 0;
- String clientName = null;
- String clientMachine = null;
- String path = null;
+
int numOpAdd = 0, numOpClose = 0, numOpDelete = 0,
numOpRenameOld = 0, numOpSetRepl = 0, numOpMkDir = 0,
numOpSetPerm = 0, numOpSetOwner = 0, numOpSetGenStamp = 0,
@@ -161,26 +145,10 @@ public class FSEditLogLoader {
long txId = expectedStartingTxId - 1;
try {
- while (true) {
- long timestamp = 0;
- long mtime = 0;
- long atime = 0;
- long blockSize = 0;
- FSEditLogOpCodes opCode;
- try {
- if (checksum != null) {
- checksum.reset();
- }
- in.mark(1);
- byte opCodeByte = in.readByte();
- opCode = FSEditLogOpCodes.fromByte(opCodeByte);
- if (opCode == FSEditLogOpCodes.OP_INVALID) {
- in.reset(); // reset back to end of file if somebody reads it again
- break; // no more transactions
- }
- } catch (EOFException e) {
- break; // no more transactions
- }
+ FSEditLogOp.Reader reader = new FSEditLogOp.Reader(in, logVersion,
+ checksum);
+ FSEditLogOp op;
+ while ((op = reader.readOp()) != null) {
recentOpcodeOffsets[numEdits % recentOpcodeOffsets.length] =
tracker.getPos();
if (LayoutVersion.supports(Feature.STORED_TXIDS, logVersion)) {
@@ -194,34 +162,34 @@ public class FSEditLogLoader {
}
numEdits++;
- switch (opCode) {
+ switch (op.opCode) {
case OP_ADD:
case OP_CLOSE: {
+ AddCloseOp addCloseOp = (AddCloseOp)op;
+
// versions > 0 support per file replication
// get name and replication
- int length = in.readInt();
- if (-7 == logVersion && length != 3||
- -17 < logVersion && logVersion < -7 && length != 4 ||
- logVersion <= -17 && length != 5) {
- throw new IOException("Incorrect data format." +
- " logVersion is " + logVersion +
- " but writables.length is " +
- length + ". ");
- }
- path = FSImageSerialization.readString(in);
- short replication = fsNamesys.adjustReplication(readShort(in));
- mtime = readLong(in);
- if (LayoutVersion.supports(Feature.FILE_ACCESS_TIME, logVersion)) {
- atime = readLong(in);
+ short replication
+ = fsNamesys.adjustReplication(addCloseOp.replication);
+
+ long blockSize = addCloseOp.blockSize;
+ BlockInfo blocks[] = new BlockInfo[addCloseOp.blocks.length];
+ for (int i = 0; i < addCloseOp.blocks.length; i++) {
+ if(addCloseOp.opCode == FSEditLogOpCodes.OP_ADD
+ && i == addCloseOp.blocks.length-1) {
+ blocks[i] = new BlockInfoUnderConstruction(addCloseOp.blocks[i],
+ replication);
+ } else {
+ blocks[i] = new BlockInfo(addCloseOp.blocks[i], replication);
+ }
}
- if (logVersion < -7) {
- blockSize = readLong(in);
+
+ PermissionStatus permissions = fsNamesys.getUpgradePermission();
+ if (addCloseOp.permissions != null) {
+ permissions = addCloseOp.permissions;
}
- // get blocks
- boolean isFileUnderConstruction = (opCode == FSEditLogOpCodes.OP_ADD);
- BlockInfo blocks[] =
- readBlocks(in, logVersion, isFileUnderConstruction, replication);
-
+
+
// Older versions of HDFS does not store the block size in inode.
// If the file has more than one block, use the size of the
// first block as the blocksize. Otherwise use the default
@@ -234,41 +202,25 @@ public class FSEditLogLoader {
blockSize = Math.max(fsNamesys.getDefaultBlockSize(), first);
}
}
-
- PermissionStatus permissions = fsNamesys.getUpgradePermission();
- if (logVersion <= -11) {
- permissions = PermissionStatus.read(in);
- }
-
- // clientname, clientMachine and block locations of last block.
- if (opCode == FSEditLogOpCodes.OP_ADD && logVersion <= -12) {
- clientName = FSImageSerialization.readString(in);
- clientMachine = FSImageSerialization.readString(in);
- if (-13 <= logVersion) {
- readDatanodeDescriptorArray(in);
- }
- } else {
- clientName = "";
- clientMachine = "";
- }
-
+
+
// The open lease transaction re-creates a file if necessary.
// Delete the file if it already exists.
if (FSNamesystem.LOG.isDebugEnabled()) {
- FSNamesystem.LOG.debug(opCode + ": " + path +
- " numblocks : " + blocks.length +
- " clientHolder " + clientName +
- " clientMachine " + clientMachine);
+ FSNamesystem.LOG.debug(op.opCode + ": " + addCloseOp.path +
+ " numblocks : " + blocks.length +
+ " clientHolder " + addCloseOp.clientName +
+ " clientMachine " + addCloseOp.clientMachine);
}
-
- fsDir.unprotectedDelete(path, mtime);
-
+
+ fsDir.unprotectedDelete(addCloseOp.path, addCloseOp.mtime);
+
// add to the file tree
INodeFile node = (INodeFile)fsDir.unprotectedAddFile(
- path, permissions,
- blocks, replication,
- mtime, atime, blockSize);
- if (isFileUnderConstruction) {
+ addCloseOp.path, permissions,
+ blocks, replication,
+ addCloseOp.mtime, addCloseOp.atime, blockSize);
+ if (addCloseOp.opCode == FSEditLogOpCodes.OP_ADD) {
numOpAdd++;
//
// Replace current node with a INodeUnderConstruction.
@@ -276,252 +228,199 @@ public class FSEditLogLoader {
//
INodeFileUnderConstruction cons = new INodeFileUnderConstruction(
node.getLocalNameBytes(),
- node.getReplication(),
+ node.getReplication(),
node.getModificationTime(),
node.getPreferredBlockSize(),
node.getBlocks(),
node.getPermissionStatus(),
- clientName,
- clientMachine,
+ addCloseOp.clientName,
+ addCloseOp.clientMachine,
null);
- fsDir.replaceNode(path, node, cons);
- fsNamesys.leaseManager.addLease(cons.getClientName(), path);
+ fsDir.replaceNode(addCloseOp.path, node, cons);
+ fsNamesys.leaseManager.addLease(cons.getClientName(),
+ addCloseOp.path);
}
break;
- }
+ }
case OP_SET_REPLICATION: {
numOpSetRepl++;
- path = FSImageSerialization.readString(in);
- short replication = fsNamesys.adjustReplication(readShort(in));
- fsDir.unprotectedSetReplication(path, replication, null);
+ SetReplicationOp setReplicationOp = (SetReplicationOp)op;
+ short replication
+ = fsNamesys.adjustReplication(setReplicationOp.replication);
+ fsDir.unprotectedSetReplication(setReplicationOp.path,
+ replication, null);
break;
- }
+ }
case OP_CONCAT_DELETE: {
numOpConcatDelete++;
- int length = in.readInt();
- if (length < 3) { // trg, srcs.., timestam
- throw new IOException("Incorrect data format. "
- + "Mkdir operation.");
- }
- String trg = FSImageSerialization.readString(in);
- int srcSize = length - 1 - 1; //trg and timestamp
- String [] srcs = new String [srcSize];
- for(int i=0; i<srcSize;i++) {
- srcs[i]= FSImageSerialization.readString(in);
- }
- timestamp = readLong(in);
- fsDir.unprotectedConcat(trg, srcs);
+
+ ConcatDeleteOp concatDeleteOp = (ConcatDeleteOp)op;
+ fsDir.unprotectedConcat(concatDeleteOp.trg, concatDeleteOp.srcs);
break;
}
case OP_RENAME_OLD: {
numOpRenameOld++;
- int length = in.readInt();
- if (length != 3) {
- throw new IOException("Incorrect data format. "
- + "Mkdir operation.");
- }
- String s = FSImageSerialization.readString(in);
- String d = FSImageSerialization.readString(in);
- timestamp = readLong(in);
- HdfsFileStatus dinfo = fsDir.getFileInfo(d, false);
- fsDir.unprotectedRenameTo(s, d, timestamp);
- fsNamesys.changeLease(s, d, dinfo);
+ RenameOldOp renameOp = (RenameOldOp)op;
+ HdfsFileStatus dinfo = fsDir.getFileInfo(renameOp.dst, false);
+ fsDir.unprotectedRenameTo(renameOp.src, renameOp.dst,
+ renameOp.timestamp);
+ fsNamesys.changeLease(renameOp.src, renameOp.dst, dinfo);
break;
}
case OP_DELETE: {
numOpDelete++;
- int length = in.readInt();
- if (length != 2) {
- throw new IOException("Incorrect data format. "
- + "delete operation.");
- }
- path = FSImageSerialization.readString(in);
- timestamp = readLong(in);
- fsDir.unprotectedDelete(path, timestamp);
+
+ DeleteOp deleteOp = (DeleteOp)op;
+ fsDir.unprotectedDelete(deleteOp.path, deleteOp.timestamp);
break;
}
case OP_MKDIR: {
numOpMkDir++;
+ MkdirOp mkdirOp = (MkdirOp)op;
PermissionStatus permissions = fsNamesys.getUpgradePermission();
- int length = in.readInt();
- if (-17 < logVersion && length != 2 ||
- logVersion <= -17 && length != 3) {
- throw new IOException("Incorrect data format. "
- + "Mkdir operation.");
+ if (mkdirOp.permissions != null) {
+ permissions = mkdirOp.permissions;
}
- path = FSImageSerialization.readString(in);
- timestamp = readLong(in);
-
- // The disk format stores atimes for directories as well.
- // However, currently this is not being updated/used because of
- // performance reasons.
- if (LayoutVersion.supports(Feature.FILE_ACCESS_TIME, logVersion)) {
- atime = readLong(in);
- }
-
- if (logVersion <= -11) {
- permissions = PermissionStatus.read(in);
- }
- fsDir.unprotectedMkdir(path, permissions, timestamp);
+
+ fsDir.unprotectedMkdir(mkdirOp.path, permissions,
+ mkdirOp.timestamp);
break;
}
case OP_SET_GENSTAMP: {
numOpSetGenStamp++;
- long lw = in.readLong();
- fsNamesys.setGenerationStamp(lw);
- break;
- }
- case OP_DATANODE_ADD: {
- numOpOther++;
- //Datanodes are not persistent any more.
- FSImageSerialization.DatanodeImage.skipOne(in);
- break;
- }
- case OP_DATANODE_REMOVE: {
- numOpOther++;
- DatanodeID nodeID = new DatanodeID();
- nodeID.readFields(in);
- //Datanodes are not persistent any more.
+ SetGenstampOp setGenstampOp = (SetGenstampOp)op;
+ fsNamesys.setGenerationStamp(setGenstampOp.genStamp);
break;
}
case OP_SET_PERMISSIONS: {
numOpSetPerm++;
- fsDir.unprotectedSetPermission(
- FSImageSerialization.readString(in), FsPermission.read(in));
+
+ SetPermissionsOp setPermissionsOp = (SetPermissionsOp)op;
+ fsDir.unprotectedSetPermission(setPermissionsOp.src,
+ setPermissionsOp.permissions);
break;
}
case OP_SET_OWNER: {
numOpSetOwner++;
- if (logVersion > -11)
- throw new IOException("Unexpected opCode " + opCode
- + " for version " + logVersion);
- fsDir.unprotectedSetOwner(FSImageSerialization.readString(in),
- FSImageSerialization.readString_EmptyAsNull(in),
- FSImageSerialization.readString_EmptyAsNull(in));
+
+ SetOwnerOp setOwnerOp = (SetOwnerOp)op;
+ fsDir.unprotectedSetOwner(setOwnerOp.src, setOwnerOp.username,
+ setOwnerOp.groupname);
break;
}
case OP_SET_NS_QUOTA: {
- fsDir.unprotectedSetQuota(FSImageSerialization.readString(in),
- readLongWritable(in),
+ SetNSQuotaOp setNSQuotaOp = (SetNSQuotaOp)op;
+ fsDir.unprotectedSetQuota(setNSQuotaOp.src,
+ setNSQuotaOp.nsQuota,
FSConstants.QUOTA_DONT_SET);
break;
}
case OP_CLEAR_NS_QUOTA: {
- fsDir.unprotectedSetQuota(FSImageSerialization.readString(in),
+ ClearNSQuotaOp clearNSQuotaOp = (ClearNSQuotaOp)op;
+ fsDir.unprotectedSetQuota(clearNSQuotaOp.src,
FSConstants.QUOTA_RESET,
FSConstants.QUOTA_DONT_SET);
break;
}
-
+
case OP_SET_QUOTA:
- fsDir.unprotectedSetQuota(FSImageSerialization.readString(in),
- readLongWritable(in),
- readLongWritable(in));
-
+ SetQuotaOp setQuotaOp = (SetQuotaOp)op;
+ fsDir.unprotectedSetQuota(setQuotaOp.src,
+ setQuotaOp.nsQuota,
+ setQuotaOp.dsQuota);
break;
-
+
case OP_TIMES: {
numOpTimes++;
- int length = in.readInt();
- if (length != 3) {
- throw new IOException("Incorrect data format. "
- + "times operation.");
- }
- path = FSImageSerialization.readString(in);
- mtime = readLong(in);
- atime = readLong(in);
- fsDir.unprotectedSetTimes(path, mtime, atime, true);
+ TimesOp timesOp = (TimesOp)op;
+
+ fsDir.unprotectedSetTimes(timesOp.path,
+ timesOp.mtime,
+ timesOp.atime, true);
break;
}
case OP_SYMLINK: {
numOpSymlink++;
- int length = in.readInt();
- if (length != 4) {
- throw new IOException("Incorrect data format. "
- + "symlink operation.");
- }
- path = FSImageSerialization.readString(in);
- String value = FSImageSerialization.readString(in);
- mtime = readLong(in);
- atime = readLong(in);
- PermissionStatus perm = PermissionStatus.read(in);
- fsDir.unprotectedSymlink(path, value, mtime, atime, perm);
+
+ SymlinkOp symlinkOp = (SymlinkOp)op;
+ fsDir.unprotectedSymlink(symlinkOp.path, symlinkOp.value,
+ symlinkOp.mtime, symlinkOp.atime,
+ symlinkOp.permissionStatus);
break;
}
case OP_RENAME: {
numOpRename++;
- int length = in.readInt();
- if (length != 3) {
- throw new IOException("Incorrect data format. "
- + "Mkdir operation.");
- }
- String s = FSImageSerialization.readString(in);
- String d = FSImageSerialization.readString(in);
- timestamp = readLong(in);
- Rename[] options = readRenameOptions(in);
- HdfsFileStatus dinfo = fsDir.getFileInfo(d, false);
- fsDir.unprotectedRenameTo(s, d, timestamp, options);
- fsNamesys.changeLease(s, d, dinfo);
+ RenameOp renameOp = (RenameOp)op;
+
+ HdfsFileStatus dinfo = fsDir.getFileInfo(renameOp.dst, false);
+ fsDir.unprotectedRenameTo(renameOp.src, renameOp.dst,
+ renameOp.timestamp, renameOp.options);
+ fsNamesys.changeLease(renameOp.src, renameOp.dst, dinfo);
break;
}
case OP_GET_DELEGATION_TOKEN: {
numOpGetDelegationToken++;
- DelegationTokenIdentifier delegationTokenId =
- new DelegationTokenIdentifier();
- delegationTokenId.readFields(in);
- long expiryTime = readLong(in);
+ GetDelegationTokenOp getDelegationTokenOp
+ = (GetDelegationTokenOp)op;
+
fsNamesys.getDelegationTokenSecretManager()
- .addPersistedDelegationToken(delegationTokenId, expiryTime);
+ .addPersistedDelegationToken(getDelegationTokenOp.token,
+ getDelegationTokenOp.expiryTime);
break;
}
case OP_RENEW_DELEGATION_TOKEN: {
numOpRenewDelegationToken++;
- DelegationTokenIdentifier delegationTokenId =
- new DelegationTokenIdentifier();
- delegationTokenId.readFields(in);
- long expiryTime = readLong(in);
+
+ RenewDelegationTokenOp renewDelegationTokenOp
+ = (RenewDelegationTokenOp)op;
fsNamesys.getDelegationTokenSecretManager()
- .updatePersistedTokenRenewal(delegationTokenId, expiryTime);
+ .updatePersistedTokenRenewal(renewDelegationTokenOp.token,
+ renewDelegationTokenOp.expiryTime);
break;
}
case OP_CANCEL_DELEGATION_TOKEN: {
numOpCancelDelegationToken++;
- DelegationTokenIdentifier delegationTokenId =
- new DelegationTokenIdentifier();
- delegationTokenId.readFields(in);
+
+ CancelDelegationTokenOp cancelDelegationTokenOp
+ = (CancelDelegationTokenOp)op;
fsNamesys.getDelegationTokenSecretManager()
- .updatePersistedTokenCancellation(delegationTokenId);
+ .updatePersistedTokenCancellation(
+ cancelDelegationTokenOp.token);
break;
}
case OP_UPDATE_MASTER_KEY: {
numOpUpdateMasterKey++;
- DelegationKey delegationKey = new DelegationKey();
- delegationKey.readFields(in);
- fsNamesys.getDelegationTokenSecretManager().updatePersistedMasterKey(
- delegationKey);
+ UpdateMasterKeyOp updateMasterKeyOp = (UpdateMasterKeyOp)op;
+ fsNamesys.getDelegationTokenSecretManager()
+ .updatePersistedMasterKey(updateMasterKeyOp.key);
break;
}
case OP_REASSIGN_LEASE: {
numOpReassignLease++;
- String leaseHolder = FSImageSerialization.readString(in);
- path = FSImageSerialization.readString(in);
- String newHolder = FSImageSerialization.readString(in);
- Lease lease = fsNamesys.leaseManager.getLease(leaseHolder);
+ ReassignLeaseOp reassignLeaseOp = (ReassignLeaseOp)op;
+
+ Lease lease = fsNamesys.leaseManager.getLease(
+ reassignLeaseOp.leaseHolder);
INodeFileUnderConstruction pendingFile =
- (INodeFileUnderConstruction) fsDir.getFileINode(path);
- fsNamesys.reassignLeaseInternal(lease, path, newHolder, pendingFile);
+ (INodeFileUnderConstruction) fsDir.getFileINode(
+ reassignLeaseOp.path);
+ fsNamesys.reassignLeaseInternal(lease,
+ reassignLeaseOp.path, reassignLeaseOp.newHolder, pendingFile);
break;
}
case OP_START_LOG_SEGMENT:
case OP_END_LOG_SEGMENT: {
// no data in here currently.
+ numOpOther++;
break;
}
- default: {
- throw new IOException("Never seen opCode " + opCode);
- }
+ case OP_DATANODE_ADD:
+ case OP_DATANODE_REMOVE:
+ numOpOther++;
+ break;
+ default:
+ throw new IOException("Invalid operation read " + op.opCode);
}
- validateChecksum(in, checksum, numEdits);
}
} catch (IOException ex) {
check203UpgradeFailure(logVersion, ex);
@@ -569,126 +468,6 @@ public class FSEditLogLoader {
}
/**
- * Validate a transaction's checksum
- */
- private static void validateChecksum(
- DataInputStream in, Checksum checksum, int tid)
- throws IOException {
- if (checksum != null) {
- int calculatedChecksum = (int)checksum.getValue();
- int readChecksum = in.readInt(); // read in checksum
- if (readChecksum != calculatedChecksum) {
- throw new ChecksumException(
- "Transaction " + tid + " is corrupt. Calculated checksum is " +
- calculatedChecksum + " but read checksum " + readChecksum, tid);
- }
- }
- }
-
- /**
- * A class to read in blocks stored in the old format. The only two
- * fields in the block were blockid and length.
- */
- static class BlockTwo implements Writable {
- long blkid;
- long len;
-
- static { // register a ctor
- WritableFactories.setFactory
- (BlockTwo.class,
- new WritableFactory() {
- public Writable newInstance() { return new BlockTwo(); }
- });
- }
-
-
- BlockTwo() {
- blkid = 0;
- len = 0;
- }
- /////////////////////////////////////
- // Writable
- /////////////////////////////////////
- public void write(DataOutput out) throws IOException {
- out.writeLong(blkid);
- out.writeLong(len);
- }
-
- public void readFields(DataInput in) throws IOException {
- this.blkid = in.readLong();
- this.len = in.readLong();
- }
- }
-
- /** This method is defined for compatibility reason. */
- static private DatanodeDescriptor[] readDatanodeDescriptorArray(DataInput in
- ) throws IOException {
- DatanodeDescriptor[] locations = new DatanodeDescriptor[in.readInt()];
- for (int i = 0; i < locations.length; i++) {
- locations[i] = new DatanodeDescriptor();
- locations[i].readFieldsFromFSEditLog(in);
- }
- return locations;
- }
-
- static private short readShort(DataInputStream in) throws IOException {
- return Short.parseShort(FSImageSerialization.readString(in));
- }
-
- static private long readLong(DataInputStream in) throws IOException {
- return Long.parseLong(FSImageSerialization.readString(in));
- }
-
- // a place holder for reading a long
- private static final LongWritable longWritable = new LongWritable();
-
- /** Read an integer from an input stream */
- private static long readLongWritable(DataInputStream in) throws IOException {
- synchronized (longWritable) {
- longWritable.readFields(in);
- return longWritable.get();
- }
- }
-
- static Rename[] readRenameOptions(DataInputStream in) throws IOException {
- BytesWritable writable = new BytesWritable();
- writable.readFields(in);
-
- byte[] bytes = writable.getBytes();
- Rename[] options = new Rename[bytes.length];
-
- for (int i = 0; i < bytes.length; i++) {
- options[i] = Rename.valueOf(bytes[i]);
- }
- return options;
- }
-
- static private BlockInfo[] readBlocks(
- DataInputStream in,
- int logVersion,
- boolean isFileUnderConstruction,
- short replication) throws IOException {
- int numBlocks = in.readInt();
- BlockInfo[] blocks = new BlockInfo[numBlocks];
- Block blk = new Block();
- BlockTwo oldblk = new BlockTwo();
- for (int i = 0; i < numBlocks; i++) {
- if (logVersion <= -14) {
- blk.readFields(in);
- } else {
- oldblk.readFields(in);
- blk.set(oldblk.blkid, oldblk.len,
- GenerationStamp.GRANDFATHER_GENERATION_STAMP);
- }
- if(isFileUnderConstruction && i == numBlocks-1)
- blocks[i] = new BlockInfoUnderConstruction(blk, replication);
- else
- blocks[i] = new BlockInfo(blk, replication);
- }
- return blocks;
- }
-
- /**
* Throw appropriate exception during upgrade from 203, when editlog loading
* could fail due to opcode conflicts.
*/
Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1134137&r1=1134136&r2=1134137&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Fri Jun 10 00:11:32 2011
@@ -1396,62 +1396,7 @@ public class FSNamesystem implements FSC
try {
INode myFile = dir.getFileINode(src);
- if (myFile != null && myFile.isUnderConstruction()) {
- INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction) myFile;
- //
- // If the file is under construction , then it must be in our
- // leases. Find the appropriate lease record.
- //
- Lease lease = leaseManager.getLeaseByPath(src);
- if (lease == null) {
- throw new AlreadyBeingCreatedException(
- "failed to create file " + src + " for " + holder +
- " on client " + clientMachine +
- " because pendingCreates is non-null but no leases found.");
- }
- //
- // We found the lease for this file. And surprisingly the original
- // holder is trying to recreate this file. This should never occur.
- //
- if (lease.getHolder().equals(holder)) {
- throw new AlreadyBeingCreatedException(
- "failed to create file " + src + " for " + holder +
- " on client " + clientMachine +
- " because current leaseholder is trying to recreate file.");
- }
- assert lease.getHolder().equals(pendingFile.getClientName()) :
- "Current lease holder " + lease.getHolder() +
- " does not match file creator " + pendingFile.getClientName();
- //
- // Current lease holder is different from the requester.
- // If the original holder has not renewed in the last SOFTLIMIT
- // period, then start lease recovery, otherwise fail.
- //
- if (lease.expiredSoftLimit()) {
- LOG.info("startFile: recover lease " + lease + ", src=" + src);
- boolean isClosed = internalReleaseLease(lease, src, null);
- if(!isClosed)
- throw new RecoveryInProgressException(
- "Failed to close file " + src +
- ". Lease recovery is in progress. Try again later.");
-
- } else {
- BlockInfoUnderConstruction lastBlock=pendingFile.getLastBlock();
- if(lastBlock != null && lastBlock.getBlockUCState() ==
- BlockUCState.UNDER_RECOVERY) {
- throw new RecoveryInProgressException(
- "Recovery in progress, file [" + src + "], " +
- "lease owner [" + lease.getHolder() + "]");
- } else {
- throw new AlreadyBeingCreatedException(
- "Failed to create file [" + src + "] for [" + holder +
- "] on client [" + clientMachine +
- "], because this file is already being created by [" +
- pendingFile.getClientName() + "] on [" +
- pendingFile.getClientMachine() + "]");
- }
- }
- }
+ recoverLeaseInternal(myFile, src, holder, clientMachine, false);
try {
blockManager.verifyReplication(src, replication, clientMachine);
@@ -1538,6 +1483,120 @@ public class FSNamesystem implements FSC
}
/**
+ * Recover lease;
+ * Immediately revoke the lease of the current lease holder and start lease
+ * recovery so that the file can be forced to be closed.
+ *
+ * @param src the path of the file to start lease recovery
+ * @param holder the lease holder's name
+ * @param clientMachine the client machine's name
+ * @return true if the file is already closed
+ * @throws IOException
+ */
+ synchronized boolean recoverLease(String src, String holder, String clientMachine)
+ throws IOException {
+ if (isInSafeMode()) {
+ throw new SafeModeException(
+ "Cannot recover the lease of " + src, safeMode);
+ }
+ if (!DFSUtil.isValidName(src)) {
+ throw new IOException("Invalid file name: " + src);
+ }
+
+ INode inode = dir.getFileINode(src);
+ if (inode == null) {
+ throw new FileNotFoundException("File not found " + src);
+ }
+
+ if (!inode.isUnderConstruction()) {
+ return true;
+ }
+ if (isPermissionEnabled) {
+ checkPathAccess(src, FsAction.WRITE);
+ }
+
+ recoverLeaseInternal(inode, src, holder, clientMachine, true);
+ return false;
+ }
+
+ private void recoverLeaseInternal(INode fileInode,
+ String src, String holder, String clientMachine, boolean force)
+ throws IOException {
+ if (fileInode != null && fileInode.isUnderConstruction()) {
+ INodeFileUnderConstruction pendingFile = (INodeFileUnderConstruction) fileInode;
+ //
+ // If the file is under construction , then it must be in our
+ // leases. Find the appropriate lease record.
+ //
+ Lease lease = leaseManager.getLease(holder);
+ //
+ // We found the lease for this file. And surprisingly the original
+ // holder is trying to recreate this file. This should never occur.
+ //
+ if (!force && lease != null) {
+ Lease leaseFile = leaseManager.getLeaseByPath(src);
+ if ((leaseFile != null && leaseFile.equals(lease)) ||
+ lease.getHolder().equals(holder)) {
+ throw new AlreadyBeingCreatedException(
+ "failed to create file " + src + " for " + holder +
+ " on client " + clientMachine +
+ " because current leaseholder is trying to recreate file.");
+ }
+ }
+ //
+ // Find the original holder.
+ //
+ lease = leaseManager.getLease(pendingFile.getClientName());
+ if (lease == null) {
+ throw new AlreadyBeingCreatedException(
+ "failed to create file " + src + " for " + holder +
+ " on client " + clientMachine +
+ " because pendingCreates is non-null but no leases found.");
+ }
+ if (force) {
+ // close now: no need to wait for soft lease expiration and
+ // close only the file src
+ LOG.info("recoverLease: recover lease " + lease + ", src=" + src +
+ " from client " + pendingFile.getClientName());
+ internalReleaseLease(lease, src, holder);
+ } else {
+ assert lease.getHolder().equals(pendingFile.getClientName()) :
+ "Current lease holder " + lease.getHolder() +
+ " does not match file creator " + pendingFile.getClientName();
+ //
+ // If the original holder has not renewed in the last SOFTLIMIT
+ // period, then start lease recovery.
+ //
+ if (lease.expiredSoftLimit()) {
+ LOG.info("startFile: recover lease " + lease + ", src=" + src +
+ " from client " + pendingFile.getClientName());
+ boolean isClosed = internalReleaseLease(lease, src, null);
+ if(!isClosed)
+ throw new RecoveryInProgressException(
+ "Failed to close file " + src +
+ ". Lease recovery is in progress. Try again later.");
+ } else {
+ BlockInfoUnderConstruction lastBlock=pendingFile.getLastBlock();
+ if(lastBlock != null && lastBlock.getBlockUCState() ==
+ BlockUCState.UNDER_RECOVERY) {
+ throw new RecoveryInProgressException(
+ "Recovery in progress, file [" + src + "], " +
+ "lease owner [" + lease.getHolder() + "]");
+ } else {
+ throw new AlreadyBeingCreatedException(
+ "Failed to create file [" + src + "] for [" + holder +
+ "] on client [" + clientMachine +
+ "], because this file is already being created by [" +
+ pendingFile.getClientName() + "] on [" +
+ pendingFile.getClientMachine() + "]");
+ }
+ }
+ }
+ }
+
+ }
+
+ /**
* Append to an existing file in the namespace.
*/
LocatedBlock appendFile(String src, String holder, String clientMachine)
Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=1134137&r1=1134136&r2=1134137&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java Fri Jun 10 00:11:32 2011
@@ -838,6 +838,12 @@ public class NameNode implements Namenod
}
/** {@inheritDoc} */
+ public boolean recoverLease(String src, String clientName) throws IOException {
+ String clientMachine = getClientMachine();
+ return namesystem.recoverLease(src, clientName, clientMachine);
+ }
+
+ /** {@inheritDoc} */
public boolean setReplication(String src, short replication)
throws IOException {
return namesystem.setReplication(src, replication);