You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by vi...@apache.org on 2013/10/30 23:22:22 UTC
svn commit: r1537330 [4/11] - in
/hadoop/common/branches/YARN-321/hadoop-hdfs-project: ./
hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/server/
hadoop-hdfs-nfs/
hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/
hadoop-hdfs-nfs/sr...
Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/CMakeLists.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/CMakeLists.txt?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/CMakeLists.txt (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/CMakeLists.txt Wed Oct 30 22:21:59 2013
@@ -62,6 +62,11 @@ endfunction()
INCLUDE(CheckCSourceCompiles)
CHECK_C_SOURCE_COMPILES("int main(void) { static __thread int i = 0; return 0; }" HAVE_BETTER_TLS)
+# Check if we need to link dl library to get dlopen.
+# dlopen on Linux is in separate library but on FreeBSD its in libc
+INCLUDE(CheckLibraryExists)
+CHECK_LIBRARY_EXISTS(dl dlopen "" NEED_LINK_DL)
+
find_package(JNI REQUIRED)
if (NOT GENERATED_JAVAH)
# Must identify where the generated headers have been placed
@@ -89,9 +94,13 @@ add_dual_library(hdfs
main/native/libhdfs/jni_helper.c
main/native/libhdfs/hdfs.c
)
+if (NEED_LINK_DL)
+ set(LIB_DL dl)
+endif(NEED_LINK_DL)
+
target_link_dual_libraries(hdfs
${JAVA_JVM_LIBRARY}
- dl
+ ${LIB_DL}
pthread
)
dual_output_directory(hdfs target/usr/local/lib)
@@ -142,6 +151,7 @@ target_link_libraries(test_native_mini_d
)
add_executable(test_libhdfs_threaded
+ main/native/libhdfs/expect.c
main/native/libhdfs/test_libhdfs_threaded.c
)
target_link_libraries(test_libhdfs_threaded
@@ -150,6 +160,16 @@ target_link_libraries(test_libhdfs_threa
pthread
)
+add_executable(test_libhdfs_zerocopy
+ main/native/libhdfs/expect.c
+ main/native/libhdfs/test/test_libhdfs_zerocopy.c
+)
+target_link_libraries(test_libhdfs_zerocopy
+ hdfs
+ native_mini_dfs
+ pthread
+)
+
IF(REQUIRE_LIBWEBHDFS)
add_subdirectory(contrib/libwebhdfs)
ENDIF(REQUIRE_LIBWEBHDFS)
Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/pom.xml?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/pom.xml (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/pom.xml Wed Oct 30 22:21:59 2013
@@ -36,8 +36,26 @@ http://maven.apache.org/xsd/maven-4.0.0.
<hadoop.common.build.dir>${basedir}/../../../../../hadoop-common-project/hadoop-common/target</hadoop.common.build.dir>
</properties>
+ <dependencyManagement>
+ <dependencies>
+ <!-- This is a really old version of netty, that gets privatized
+ via shading and hence it is not managed via a parent pom -->
+ <dependency>
+ <groupId>org.jboss.netty</groupId>
+ <artifactId>netty</artifactId>
+ <version>3.2.4.Final</version>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
+
<dependencies>
<dependency>
+ <groupId>org.jboss.netty</groupId>
+ <artifactId>netty</artifactId>
+ <scope>compile</scope>
+ </dependency>
+
+ <dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<scope>compile</scope>
Propchange: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs-config.cmd
------------------------------------------------------------------------------
svn:eol-style = native
Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs.cmd
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs.cmd?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs.cmd (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs.cmd Wed Oct 30 22:21:59 2013
@@ -47,7 +47,17 @@ if "%1" == "--config" (
goto print_usage
)
- call :%hdfs-command% %hdfs-command-arguments%
+ set hdfscommands=dfs namenode secondarynamenode journalnode zkfc datanode dfsadmin haadmin fsck balancer jmxget oiv oev fetchdt getconf groups snapshotDiff lsSnapshottableDir
+ for %%i in ( %hdfscommands% ) do (
+ if %hdfs-command% == %%i set hdfscommand=true
+ )
+ if defined hdfscommand (
+ call :%hdfs-command%
+ ) else (
+ set CLASSPATH=%CLASSPATH%;%CD%
+ set CLASS=%hdfs-command%
+ )
+
set java_arguments=%JAVA_HEAP_MAX% %HADOOP_OPTS% -classpath %CLASSPATH% %CLASS% %hdfs-command-arguments%
call %JAVA% %java_arguments%
@@ -58,6 +68,11 @@ goto :eof
set HADOOP_OPTS=%HADOOP_OPTS% %HADOOP_NAMENODE_OPTS%
goto :eof
+:journalnode
+ set CLASS=org.apache.hadoop.hdfs.qjournal.server.JournalNode
+ set HADOOP_OPTS=%HADOOP_OPTS% %HADOOP_JOURNALNODE_OPTS%
+ goto :eof
+
:zkfc
set CLASS=org.apache.hadoop.hdfs.tools.DFSZKFailoverController
set HADOOP_OPTS=%HADOOP_OPTS% %HADOOP_ZKFC_OPTS%
@@ -123,6 +138,14 @@ goto :eof
set CLASS=org.apache.hadoop.hdfs.tools.GetGroups
goto :eof
+:snapshotDiff
+ set CLASS=org.apache.hadoop.hdfs.tools.snapshot.SnapshotDiff
+ goto :eof
+
+:lsSnapshottableDir
+ set CLASS=org.apache.hadoop.hdfs.tools.snapshot.LsSnapshottableDir
+ goto :eof
+
@rem This changes %1, %2 etc. Hence those cannot be used after calling this.
:make_command_arguments
if "%1" == "--config" (
@@ -153,9 +176,11 @@ goto :eof
@echo namenode -format format the DFS filesystem
@echo secondarynamenode run the DFS secondary namenode
@echo namenode run the DFS namenode
+ @echo journalnode run the DFS journalnode
@echo zkfc run the ZK Failover Controller daemon
@echo datanode run a DFS datanode
@echo dfsadmin run a DFS admin client
+ @echo haadmin run a DFS HA admin client
@echo fsck run a DFS filesystem checking utility
@echo balancer run a cluster balancing utility
@echo jmxget get JMX exported values from NameNode or DataNode.
@@ -164,7 +189,10 @@ goto :eof
@echo fetchdt fetch a delegation token from the NameNode
@echo getconf get config values from configuration
@echo groups get the groups which users belong to
- @echo Use -help to see options
+ @echo snapshotDiff diff two snapshots of a directory or diff the
+ @echo current directory contents with a snapshot
+ @echo lsSnapshottableDir list all snapshottable dirs owned by the current user
+ @echo Use -help to see options
@echo.
@echo Most commands print help when invoked w/o parameters.
Propchange: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs.cmd
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/start-dfs.cmd
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/stop-dfs.cmd
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/
------------------------------------------------------------------------------
Merged /hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java:r1519784-1537326
Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java:r1523400,1523875,1525828,1531125,1532468,1532857,1532932,1533917,1534426
Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java Wed Oct 30 22:21:59 2013
@@ -20,12 +20,16 @@ package org.apache.hadoop.hdfs;
import java.io.IOException;
import org.apache.hadoop.fs.ByteBufferReadable;
+import org.apache.hadoop.hdfs.client.ClientMmap;
+import org.apache.hadoop.hdfs.client.ClientMmapManager;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
/**
* A BlockReader is responsible for reading a single block
* from a single datanode.
*/
public interface BlockReader extends ByteBufferReadable {
+
/* same interface as inputStream java.io.InputStream#read()
* used by DFSInputStream#read()
@@ -81,4 +85,14 @@ public interface BlockReader extends Byt
* All short-circuit reads are also local.
*/
boolean isShortCircuit();
+
+ /**
+ * Get a ClientMmap object for this BlockReader.
+ *
+ * @param curBlock The current block.
+ * @return The ClientMmap object, or null if mmap is not
+ * supported.
+ */
+ ClientMmap getClientMmap(LocatedBlock curBlock,
+ ClientMmapManager mmapManager);
}
Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java Wed Oct 30 22:21:59 2013
@@ -22,11 +22,15 @@ import java.io.DataInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
+import org.apache.hadoop.conf.Configuration;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.client.ClientMmap;
+import org.apache.hadoop.hdfs.client.ClientMmapManager;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.util.DirectBufferPool;
import org.apache.hadoop.io.IOUtils;
@@ -87,6 +91,8 @@ class BlockReaderLocal implements BlockR
private final ExtendedBlock block;
private final FileInputStreamCache fisCache;
+ private ClientMmap clientMmap;
+ private boolean mmapDisabled;
private static int getSlowReadBufferNumChunks(int bufSize,
int bytesPerChecksum) {
@@ -113,6 +119,8 @@ class BlockReaderLocal implements BlockR
this.datanodeID = datanodeID;
this.block = block;
this.fisCache = fisCache;
+ this.clientMmap = null;
+ this.mmapDisabled = false;
// read and handle the common header here. For now just a version
checksumIn.getChannel().position(0);
@@ -487,6 +495,10 @@ class BlockReaderLocal implements BlockR
@Override
public synchronized void close() throws IOException {
+ if (clientMmap != null) {
+ clientMmap.unref();
+ clientMmap = null;
+ }
if (fisCache != null) {
if (LOG.isDebugEnabled()) {
LOG.debug("putting FileInputStream for " + filename +
@@ -534,4 +546,30 @@ class BlockReaderLocal implements BlockR
public boolean isShortCircuit() {
return true;
}
+
+ @Override
+ public ClientMmap getClientMmap(LocatedBlock curBlock,
+ ClientMmapManager mmapManager) {
+ if (clientMmap == null) {
+ if (mmapDisabled) {
+ return null;
+ }
+ try {
+ clientMmap = mmapManager.fetch(datanodeID, block, dataIn);
+ if (clientMmap == null) {
+ mmapDisabled = true;
+ return null;
+ }
+ } catch (InterruptedException e) {
+ LOG.error("Interrupted while setting up mmap for " + filename, e);
+ Thread.currentThread().interrupt();
+ return null;
+ } catch (IOException e) {
+ LOG.error("unable to set up mmap for " + filename, e);
+ mmapDisabled = true;
+ return null;
+ }
+ }
+ return clientMmap;
+ }
}
Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java Wed Oct 30 22:21:59 2013
@@ -28,6 +28,8 @@ import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
+import org.apache.hadoop.hdfs.client.ClientMmap;
+import org.apache.hadoop.hdfs.client.ClientMmapManager;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -35,6 +37,7 @@ import org.apache.hadoop.hdfs.protocol.B
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.util.DirectBufferPool;
@@ -701,4 +704,10 @@ class BlockReaderLocalLegacy implements
public boolean isShortCircuit() {
return true;
}
+
+ @Override
+ public ClientMmap getClientMmap(LocatedBlock curBlock,
+ ClientMmapManager mmapManager) {
+ return null;
+ }
}
Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java Wed Oct 30 22:21:59 2013
@@ -27,6 +27,9 @@ import static org.apache.hadoop.hdfs.DFS
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_RETRIES_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHED_CONN_RETRY_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_READS;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_WRITES;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_READAHEAD;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_DEFAULT;
@@ -44,9 +47,6 @@ import static org.apache.hadoop.hdfs.DFS
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_READS;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_WRITES;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_READAHEAD;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
@@ -102,9 +102,11 @@ import org.apache.hadoop.fs.MD5MD5CRC32G
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Options.ChecksumOpt;
import org.apache.hadoop.fs.ParentNotDirectoryException;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.fs.VolumeId;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.client.ClientMmapManager;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
@@ -115,13 +117,13 @@ import org.apache.hadoop.hdfs.protocol.D
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
-import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
+import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
@@ -146,6 +148,7 @@ import org.apache.hadoop.io.EnumSetWrita
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.retry.LossyRetryInvocationHandler;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
@@ -206,7 +209,43 @@ public class DFSClient implements java.i
private boolean shouldUseLegacyBlockReaderLocal;
private final CachingStrategy defaultReadCachingStrategy;
private final CachingStrategy defaultWriteCachingStrategy;
+ private ClientMmapManager mmapManager;
+ private static final ClientMmapManagerFactory MMAP_MANAGER_FACTORY =
+ new ClientMmapManagerFactory();
+
+ private static final class ClientMmapManagerFactory {
+ private ClientMmapManager mmapManager = null;
+ /**
+ * Tracks the number of users of mmapManager.
+ */
+ private int refcnt = 0;
+
+ synchronized ClientMmapManager get(Configuration conf) {
+ if (refcnt++ == 0) {
+ mmapManager = ClientMmapManager.fromConf(conf);
+ } else {
+ String mismatches = mmapManager.verifyConfigurationMatches(conf);
+ if (!mismatches.isEmpty()) {
+ LOG.warn("The ClientMmapManager settings you specified " +
+ "have been ignored because another thread created the " +
+ "ClientMmapManager first. " + mismatches);
+ }
+ }
+ return mmapManager;
+ }
+
+ synchronized void unref(ClientMmapManager mmapManager) {
+ if (this.mmapManager != mmapManager) {
+ throw new IllegalArgumentException();
+ }
+ if (--refcnt == 0) {
+ IOUtils.cleanup(LOG, mmapManager);
+ mmapManager = null;
+ }
+ }
+ }
+
/**
* DFSClient configuration
*/
@@ -453,7 +492,11 @@ public class DFSClient implements java.i
/**
* Create a new DFSClient connected to the given nameNodeUri or rpcNamenode.
- * Exactly one of nameNodeUri or rpcNamenode must be null.
+ * If HA is enabled and a positive value is set for
+ * {@link DFSConfigKeys#DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY} in the
+ * configuration, the DFSClient will use {@link LossyRetryInvocationHandler}
+ * as its RetryInvocationHandler. Otherwise one of nameNodeUri or rpcNamenode
+ * must be null.
*/
@VisibleForTesting
public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode,
@@ -477,7 +520,23 @@ public class DFSClient implements java.i
this.clientName = "DFSClient_" + dfsClientConf.taskId + "_" +
DFSUtil.getRandom().nextInt() + "_" + Thread.currentThread().getId();
- if (rpcNamenode != null) {
+ int numResponseToDrop = conf.getInt(
+ DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY,
+ DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_DEFAULT);
+ NameNodeProxies.ProxyAndInfo<ClientProtocol> proxyInfo = null;
+ if (numResponseToDrop > 0) {
+ // This case is used for testing.
+ LOG.warn(DFSConfigKeys.DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY
+ + " is set to " + numResponseToDrop
+ + ", this hacked client will proactively drop responses");
+ proxyInfo = NameNodeProxies.createProxyWithLossyRetryHandler(conf,
+ nameNodeUri, ClientProtocol.class, numResponseToDrop);
+ }
+
+ if (proxyInfo != null) {
+ this.dtService = proxyInfo.getDelegationTokenService();
+ this.namenode = proxyInfo.getProxy();
+ } else if (rpcNamenode != null) {
// This case is used for testing.
Preconditions.checkArgument(nameNodeUri == null);
this.namenode = rpcNamenode;
@@ -485,9 +544,8 @@ public class DFSClient implements java.i
} else {
Preconditions.checkArgument(nameNodeUri != null,
"null URI");
- NameNodeProxies.ProxyAndInfo<ClientProtocol> proxyInfo =
- NameNodeProxies.createProxy(conf, nameNodeUri, ClientProtocol.class);
-
+ proxyInfo = NameNodeProxies.createProxy(conf, nameNodeUri,
+ ClientProtocol.class);
this.dtService = proxyInfo.getDelegationTokenService();
this.namenode = proxyInfo.getProxy();
}
@@ -515,8 +573,9 @@ public class DFSClient implements java.i
new CachingStrategy(readDropBehind, readahead);
this.defaultWriteCachingStrategy =
new CachingStrategy(writeDropBehind, readahead);
+ this.mmapManager = MMAP_MANAGER_FACTORY.get(conf);
}
-
+
/**
* Return the socket addresses to use with each configured
* local interface. Local interfaces may be specified by IP
@@ -719,9 +778,12 @@ public class DFSClient implements java.i
/** Abort and release resources held. Ignore all errors. */
void abort() {
+ if (mmapManager != null) {
+ MMAP_MANAGER_FACTORY.unref(mmapManager);
+ mmapManager = null;
+ }
clientRunning = false;
closeAllFilesBeingWritten(true);
-
try {
// remove reference to this client and stop the renewer,
// if there is no more clients under the renewer.
@@ -765,6 +827,10 @@ public class DFSClient implements java.i
*/
@Override
public synchronized void close() throws IOException {
+ if (mmapManager != null) {
+ MMAP_MANAGER_FACTORY.unref(mmapManager);
+ mmapManager = null;
+ }
if(clientRunning) {
closeAllFilesBeingWritten(false);
clientRunning = false;
@@ -825,10 +891,15 @@ public class DFSClient implements java.i
assert dtService != null;
Token<DelegationTokenIdentifier> token =
namenode.getDelegationToken(renewer);
- token.setService(this.dtService);
- LOG.info("Created " + DelegationTokenIdentifier.stringifyToken(token));
+ if (token != null) {
+ token.setService(this.dtService);
+ LOG.info("Created " + DelegationTokenIdentifier.stringifyToken(token));
+ } else {
+ LOG.info("Cannot get delegation token from " + renewer);
+ }
return token;
+
}
/**
@@ -2493,4 +2564,9 @@ public class DFSClient implements java.i
public CachingStrategy getDefaultWriteCachingStrategy() {
return defaultWriteCachingStrategy;
}
+
+ @VisibleForTesting
+ public ClientMmapManager getMmapManager() {
+ return mmapManager;
+ }
}
Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Wed Oct 30 22:21:59 2013
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault;
/**
* This class contains constants for configuration keys used
@@ -192,7 +193,10 @@ public class DFSConfigKeys extends Commo
public static final boolean DFS_DATANODE_SYNCONCLOSE_DEFAULT = false;
public static final String DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_KEY = "dfs.datanode.socket.reuse.keepalive";
public static final int DFS_DATANODE_SOCKET_REUSE_KEEPALIVE_DEFAULT = 1000;
-
+
+ public static final String DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_KEY = "dfs.namenode.datanode.registration.ip-hostname-check";
+ public static final boolean DFS_NAMENODE_DATANODE_REGISTRATION_IP_HOSTNAME_CHECK_DEFAULT = true;
+
// Whether to enable datanode's stale state detection and usage for reads
public static final String DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY = "dfs.namenode.avoid.read.stale.datanode";
public static final boolean DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_DEFAULT = false;
@@ -267,6 +271,8 @@ public class DFSConfigKeys extends Commo
public static final String DFS_CLIENT_LOCAL_INTERFACES = "dfs.client.local.interfaces";
public static final String DFS_NAMENODE_AUDIT_LOGGERS_KEY = "dfs.namenode.audit.loggers";
public static final String DFS_NAMENODE_DEFAULT_AUDIT_LOGGER_NAME = "default";
+ public static final String DFS_NAMENODE_AUDIT_LOG_TOKEN_TRACKING_ID_KEY = "dfs.namenode.audit.log.token.tracking.id";
+ public static final boolean DFS_NAMENODE_AUDIT_LOG_TOKEN_TRACKING_ID_DEFAULT = false;
// Much code in hdfs is not yet updated to use these keys.
public static final String DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY = "dfs.client.block.write.locateFollowingBlock.retries";
@@ -346,6 +352,8 @@ public class DFSConfigKeys extends Commo
public static final String DFS_BLOCK_ACCESS_TOKEN_LIFETIME_KEY = "dfs.block.access.token.lifetime";
public static final long DFS_BLOCK_ACCESS_TOKEN_LIFETIME_DEFAULT = 600L;
+ public static final String DFS_BLOCK_REPLICATOR_CLASSNAME_KEY = "dfs.block.replicator.classname";
+ public static final Class<BlockPlacementPolicyDefault> DFS_BLOCK_REPLICATOR_CLASSNAME_DEFAULT = BlockPlacementPolicyDefault.class;
public static final String DFS_REPLICATION_MAX_KEY = "dfs.replication.max";
public static final int DFS_REPLICATION_MAX_DEFAULT = 512;
public static final String DFS_DF_INTERVAL_KEY = "dfs.df.interval";
@@ -371,6 +379,12 @@ public class DFSConfigKeys extends Commo
public static final int DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_DEFAULT = 1024 * 1024;
public static final String DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC = "dfs.client.domain.socket.data.traffic";
public static final boolean DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT = false;
+ public static final String DFS_CLIENT_MMAP_CACHE_SIZE = "dfs.client.mmap.cache.size";
+ public static final int DFS_CLIENT_MMAP_CACHE_SIZE_DEFAULT = 1024;
+ public static final String DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS = "dfs.client.mmap.cache.timeout.ms";
+ public static final long DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS_DEFAULT = 15 * 60 * 1000;
+ public static final String DFS_CLIENT_MMAP_CACHE_THREAD_RUNS_PER_TIMEOUT = "dfs.client.mmap.cache.thread.runs.per.timeout";
+ public static final int DFS_CLIENT_MMAP_CACHE_THREAD_RUNS_PER_TIMEOUT_DEFAULT = 4;
// property for fsimage compression
public static final String DFS_IMAGE_COMPRESS_KEY = "dfs.image.compress";
@@ -497,9 +511,27 @@ public class DFSConfigKeys extends Commo
public static final long DFS_NAMENODE_RETRY_CACHE_EXPIRYTIME_MILLIS_DEFAULT = 600000; // 10 minutes
public static final String DFS_NAMENODE_RETRY_CACHE_HEAP_PERCENT_KEY = "dfs.namenode.retrycache.heap.percent";
public static final float DFS_NAMENODE_RETRY_CACHE_HEAP_PERCENT_DEFAULT = 0.03f;
+
+ // The number of NN response dropped by client proactively in each RPC call.
+ // For testing NN retry cache, we can set this property with positive value.
+ public static final String DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_KEY = "dfs.client.test.drop.namenode.response.number";
+ public static final int DFS_CLIENT_TEST_DROP_NAMENODE_RESPONSE_NUM_DEFAULT = 0;
+
// Hidden configuration undocumented in hdfs-site. xml
// Timeout to wait for block receiver and responder thread to stop
public static final String DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY = "dfs.datanode.xceiver.stop.timeout.millis";
public static final long DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_DEFAULT = 60000;
+
+ // WebHDFS retry policy
+ public static final String DFS_HTTP_CLIENT_RETRY_POLICY_ENABLED_KEY = "dfs.http.client.retry.policy.enabled";
+ public static final boolean DFS_HTTP_CLIENT_RETRY_POLICY_ENABLED_DEFAULT = false;
+ public static final String DFS_HTTP_CLIENT_RETRY_POLICY_SPEC_KEY = "dfs.http.client.retry.policy.spec";
+ public static final String DFS_HTTP_CLIENT_RETRY_POLICY_SPEC_DEFAULT = "10000,6,60000,10"; //t1,n1,t2,n2,...
+ public static final String DFS_HTTP_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY = "dfs.http.client.failover.max.attempts";
+ public static final int DFS_HTTP_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT = 15;
+ public static final String DFS_HTTP_CLIENT_FAILOVER_SLEEPTIME_BASE_KEY = "dfs.http.client.failover.sleep.base.millis";
+ public static final int DFS_HTTP_CLIENT_FAILOVER_SLEEPTIME_BASE_DEFAULT = 500;
+ public static final String DFS_HTTP_CLIENT_FAILOVER_SLEEPTIME_MAX_KEY = "dfs.http.client.failover.sleep.max.millis";
+ public static final int DFS_HTTP_CLIENT_FAILOVER_SLEEPTIME_MAX_DEFAULT = 15000;
}
Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java Wed Oct 30 22:21:59 2013
@@ -24,6 +24,7 @@ import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.AbstractMap;
import java.util.ArrayList;
+import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -36,11 +37,15 @@ import java.util.concurrent.ConcurrentHa
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.ByteBufferReadable;
+import org.apache.hadoop.fs.ByteBufferUtil;
import org.apache.hadoop.fs.CanSetDropBehind;
import org.apache.hadoop.fs.CanSetReadahead;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.fs.HasEnhancedByteBufferAccess;
+import org.apache.hadoop.fs.ReadOption;
import org.apache.hadoop.fs.UnresolvedLinkException;
+import org.apache.hadoop.hdfs.client.ClientMmap;
import org.apache.hadoop.hdfs.net.DomainPeer;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.net.TcpPeerServer;
@@ -54,12 +59,14 @@ import org.apache.hadoop.hdfs.security.t
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
+import org.apache.hadoop.io.ByteBufferPool;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.IdentityHashStore;
import com.google.common.annotations.VisibleForTesting;
@@ -69,7 +76,8 @@ import com.google.common.annotations.Vis
****************************************************************/
@InterfaceAudience.Private
public class DFSInputStream extends FSInputStream
-implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead {
+implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
+ HasEnhancedByteBufferAccess {
@VisibleForTesting
static boolean tcpReadsDisabledForTesting = false;
private final PeerCache peerCache;
@@ -87,17 +95,28 @@ implements ByteBufferReadable, CanSetDro
private CachingStrategy cachingStrategy;
private final ReadStatistics readStatistics = new ReadStatistics();
+ /**
+ * Track the ByteBuffers that we have handed out to readers.
+ *
+ * The value type can be either ByteBufferPool or ClientMmap, depending on
+ * whether we this is a memory-mapped buffer or not.
+ */
+ private final IdentityHashStore<ByteBuffer, Object>
+ extendedReadBuffers = new IdentityHashStore<ByteBuffer, Object>(0);
+
public static class ReadStatistics {
public ReadStatistics() {
this.totalBytesRead = 0;
this.totalLocalBytesRead = 0;
this.totalShortCircuitBytesRead = 0;
+ this.totalZeroCopyBytesRead = 0;
}
public ReadStatistics(ReadStatistics rhs) {
this.totalBytesRead = rhs.getTotalBytesRead();
this.totalLocalBytesRead = rhs.getTotalLocalBytesRead();
this.totalShortCircuitBytesRead = rhs.getTotalShortCircuitBytesRead();
+ this.totalZeroCopyBytesRead = rhs.getTotalZeroCopyBytesRead();
}
/**
@@ -123,6 +142,13 @@ implements ByteBufferReadable, CanSetDro
public long getTotalShortCircuitBytesRead() {
return totalShortCircuitBytesRead;
}
+
+ /**
+ * @return The total number of zero-copy bytes read.
+ */
+ public long getTotalZeroCopyBytesRead() {
+ return totalZeroCopyBytesRead;
+ }
/**
* @return The total number of bytes read which were not local.
@@ -145,12 +171,21 @@ implements ByteBufferReadable, CanSetDro
this.totalLocalBytesRead += amt;
this.totalShortCircuitBytesRead += amt;
}
+
+ void addZeroCopyBytes(long amt) {
+ this.totalBytesRead += amt;
+ this.totalLocalBytesRead += amt;
+ this.totalShortCircuitBytesRead += amt;
+ this.totalZeroCopyBytesRead += amt;
+ }
private long totalBytesRead;
private long totalLocalBytesRead;
private long totalShortCircuitBytesRead;
+
+ private long totalZeroCopyBytesRead;
}
private final FileInputStreamCache fileInputStreamCache;
@@ -368,7 +403,7 @@ implements ByteBufferReadable, CanSetDro
//check offset
if (offset < 0 || offset >= getFileLength()) {
- throw new IOException("offset < 0 || offset > getFileLength(), offset="
+ throw new IOException("offset < 0 || offset >= getFileLength(), offset="
+ offset
+ ", updatePosition=" + updatePosition
+ ", locatedBlocks=" + locatedBlocks);
@@ -587,6 +622,20 @@ implements ByteBufferReadable, CanSetDro
}
dfsClient.checkOpen();
+ if (!extendedReadBuffers.isEmpty()) {
+ final StringBuilder builder = new StringBuilder();
+ extendedReadBuffers.visitAll(new IdentityHashStore.Visitor<ByteBuffer, Object>() {
+ private String prefix = "";
+ @Override
+ public void accept(ByteBuffer k, Object v) {
+ builder.append(prefix).append(k);
+ prefix = ", ";
+ }
+ });
+ DFSClient.LOG.warn("closing file " + src + ", but there are still " +
+ "unreleased ByteBuffers allocated by read(). " +
+ "Please release " + builder.toString() + ".");
+ }
if (blockReader != null) {
blockReader.close();
blockReader = null;
@@ -1393,4 +1442,100 @@ implements ByteBufferReadable, CanSetDro
this.cachingStrategy.setDropBehind(dropBehind);
closeCurrentBlockReader();
}
+
+ @Override
+ public synchronized ByteBuffer read(ByteBufferPool bufferPool,
+ int maxLength, EnumSet<ReadOption> opts)
+ throws IOException, UnsupportedOperationException {
+ assert(maxLength > 0);
+ if (((blockReader == null) || (blockEnd == -1)) &&
+ (pos < getFileLength())) {
+ /*
+ * If we don't have a blockReader, or the one we have has no more bytes
+ * left to read, we call seekToBlockSource to get a new blockReader and
+ * recalculate blockEnd. Note that we assume we're not at EOF here
+ * (we check this above).
+ */
+ if ((!seekToBlockSource(pos)) || (blockReader == null)) {
+ throw new IOException("failed to allocate new BlockReader " +
+ "at position " + pos);
+ }
+ }
+ boolean canSkipChecksums = opts.contains(ReadOption.SKIP_CHECKSUMS);
+ if (canSkipChecksums) {
+ ByteBuffer buffer = tryReadZeroCopy(maxLength);
+ if (buffer != null) {
+ return buffer;
+ }
+ }
+ ByteBuffer buffer = ByteBufferUtil.
+ fallbackRead(this, bufferPool, maxLength);
+ if (buffer != null) {
+ extendedReadBuffers.put(buffer, bufferPool);
+ }
+ return buffer;
+ }
+
+ private synchronized ByteBuffer tryReadZeroCopy(int maxLength)
+ throws IOException {
+ // Java ByteBuffers can't be longer than 2 GB, because they use
+ // 4-byte signed integers to represent capacity, etc.
+ // So we can't mmap the parts of the block higher than the 2 GB offset.
+ // FIXME: we could work around this with multiple memory maps.
+ // See HDFS-5101.
+ long blockEnd32 = Math.min(Integer.MAX_VALUE, blockEnd);
+ long curPos = pos;
+ long blockLeft = blockEnd32 - curPos + 1;
+ if (blockLeft <= 0) {
+ if (DFSClient.LOG.isDebugEnabled()) {
+ DFSClient.LOG.debug("unable to perform a zero-copy read from offset " +
+ curPos + " of " + src + "; blockLeft = " + blockLeft +
+ "; blockEnd32 = " + blockEnd32 + ", blockEnd = " + blockEnd +
+ "; maxLength = " + maxLength);
+ }
+ return null;
+ }
+ int length = Math.min((int)blockLeft, maxLength);
+ long blockStartInFile = currentLocatedBlock.getStartOffset();
+ long blockPos = curPos - blockStartInFile;
+ long limit = blockPos + length;
+ ClientMmap clientMmap =
+ blockReader.getClientMmap(currentLocatedBlock,
+ dfsClient.getMmapManager());
+ if (clientMmap == null) {
+ if (DFSClient.LOG.isDebugEnabled()) {
+ DFSClient.LOG.debug("unable to perform a zero-copy read from offset " +
+ curPos + " of " + src + "; BlockReader#getClientMmap returned " +
+ "null.");
+ }
+ return null;
+ }
+ seek(pos + length);
+ ByteBuffer buffer = clientMmap.getMappedByteBuffer().asReadOnlyBuffer();
+ buffer.position((int)blockPos);
+ buffer.limit((int)limit);
+ clientMmap.ref();
+ extendedReadBuffers.put(buffer, clientMmap);
+ readStatistics.addZeroCopyBytes(length);
+ if (DFSClient.LOG.isDebugEnabled()) {
+ DFSClient.LOG.debug("readZeroCopy read " + maxLength + " bytes from " +
+ "offset " + curPos + " via the zero-copy read path. " +
+ "blockEnd = " + blockEnd);
+ }
+ return buffer;
+ }
+
+ @Override
+ public synchronized void releaseBuffer(ByteBuffer buffer) {
+ Object val = extendedReadBuffers.remove(buffer);
+ if (val == null) {
+ throw new IllegalArgumentException("tried to release a buffer " +
+ "that was not created by this stream, " + buffer);
+ }
+ if (val instanceof ClientMmap) {
+ ((ClientMmap)val).unref();
+ } else if (val instanceof ByteBufferPool) {
+ ((ByteBufferPool)val).putBuffer(buffer);
+ }
+ }
}
Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java Wed Oct 30 22:21:59 2013
@@ -38,6 +38,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.CanSetDropBehind;
@@ -46,7 +47,6 @@ import org.apache.hadoop.fs.FSOutputSumm
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.ParentNotDirectoryException;
import org.apache.hadoop.fs.Syncable;
-import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag;
@@ -85,7 +85,6 @@ import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.Time;
-import org.mortbay.log.Log;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.CacheBuilder;
@@ -138,10 +137,10 @@ public class DFSOutputStream extends FSO
private long currentSeqno = 0;
private long lastQueuedSeqno = -1;
private long lastAckedSeqno = -1;
- private long bytesCurBlock = 0; // bytes writen in current block
+ private long bytesCurBlock = 0; // bytes written in current block
private int packetSize = 0; // write packet size, not including the header.
private int chunksPerPacket = 0;
- private volatile IOException lastException = null;
+ private final AtomicReference<IOException> lastException = new AtomicReference<IOException>();
private long artificialSlowdown = 0;
private long lastFlushOffset = 0; // offset when flush was invoked
//persist blocks on namenode
@@ -466,8 +465,7 @@ public class DFSOutputStream extends FSO
}
}
- Packet one = null;
-
+ Packet one;
try {
// process datanode IO errors if any
boolean doSleep = false;
@@ -511,7 +509,7 @@ public class DFSOutputStream extends FSO
if(DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("Allocating new block");
}
- nodes = nextBlockOutputStream(src);
+ nodes = nextBlockOutputStream();
initDataStreaming();
} else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {
if(DFSClient.LOG.isDebugEnabled()) {
@@ -575,9 +573,6 @@ public class DFSOutputStream extends FSO
}
lastPacket = Time.now();
- if (one.isHeartbeatPacket()) { //heartbeat packet
- }
-
// update bytesSent
long tmpBytesSent = one.getLastByteOffsetBlock();
if (bytesSent < tmpBytesSent) {
@@ -695,7 +690,7 @@ public class DFSOutputStream extends FSO
}
//
- // Processes reponses from the datanodes. A packet is removed
+ // Processes responses from the datanodes. A packet is removed
// from the ackQueue when its response arrives.
//
private class ResponseProcessor extends Daemon {
@@ -737,18 +732,18 @@ public class DFSOutputStream extends FSO
}
assert seqno != PipelineAck.UNKOWN_SEQNO :
- "Ack for unkown seqno should be a failed ack: " + ack;
+ "Ack for unknown seqno should be a failed ack: " + ack;
if (seqno == Packet.HEART_BEAT_SEQNO) { // a heartbeat ack
continue;
}
// a success ack for a data packet
- Packet one = null;
+ Packet one;
synchronized (dataQueue) {
one = ackQueue.getFirst();
}
if (one.seqno != seqno) {
- throw new IOException("Responseprocessor: Expecting seqno " +
+ throw new IOException("ResponseProcessor: Expecting seqno " +
" for block " + block +
one.seqno + " but received " + seqno);
}
@@ -814,8 +809,8 @@ public class DFSOutputStream extends FSO
if (++pipelineRecoveryCount > 5) {
DFSClient.LOG.warn("Error recovering pipeline for writing " +
block + ". Already retried 5 times for the same packet.");
- lastException = new IOException("Failing write. Tried pipeline " +
- "recovery 5 times without success.");
+ lastException.set(new IOException("Failing write. Tried pipeline " +
+ "recovery 5 times without success."));
streamerClosed = true;
return false;
}
@@ -1005,8 +1000,8 @@ public class DFSOutputStream extends FSO
}
}
if (nodes.length <= 1) {
- lastException = new IOException("All datanodes " + pipelineMsg
- + " are bad. Aborting...");
+ lastException.set(new IOException("All datanodes " + pipelineMsg
+ + " are bad. Aborting..."));
streamerClosed = true;
return false;
}
@@ -1021,7 +1016,7 @@ public class DFSOutputStream extends FSO
newnodes.length-errorIndex);
nodes = newnodes;
hasError = false;
- lastException = null;
+ lastException.set(null);
errorIndex = -1;
}
@@ -1057,7 +1052,7 @@ public class DFSOutputStream extends FSO
* Must get block ID and the IDs of the destinations from the namenode.
* Returns the list of target datanodes.
*/
- private DatanodeInfo[] nextBlockOutputStream(String client) throws IOException {
+ private DatanodeInfo[] nextBlockOutputStream() throws IOException {
LocatedBlock lb = null;
DatanodeInfo[] nodes = null;
int count = dfsClient.getConf().nBlockWriteRetry;
@@ -1065,7 +1060,7 @@ public class DFSOutputStream extends FSO
ExtendedBlock oldBlock = block;
do {
hasError = false;
- lastException = null;
+ lastException.set(null);
errorIndex = -1;
success = false;
@@ -1107,6 +1102,11 @@ public class DFSOutputStream extends FSO
//
private boolean createBlockOutputStream(DatanodeInfo[] nodes, long newGS,
boolean recoveryFlag) {
+ if (nodes.length == 0) {
+ DFSClient.LOG.info("nodes are empty for write pipeline of block "
+ + block);
+ return false;
+ }
Status pipelineStatus = SUCCESS;
String firstBadLink = "";
if (DFSClient.LOG.isDebugEnabled()) {
@@ -1215,8 +1215,7 @@ public class DFSOutputStream extends FSO
}
private LocatedBlock locateFollowingBlock(long start,
- DatanodeInfo[] excludedNodes)
- throws IOException, UnresolvedLinkException {
+ DatanodeInfo[] excludedNodes) throws IOException {
int retries = dfsClient.getConf().nBlockWriteLocateFollowingRetry;
long sleeptime = 400;
while (true) {
@@ -1279,9 +1278,7 @@ public class DFSOutputStream extends FSO
}
private void setLastException(IOException e) {
- if (lastException == null) {
- lastException = e;
- }
+ lastException.compareAndSet(null, e);
}
}
@@ -1289,7 +1286,7 @@ public class DFSOutputStream extends FSO
* Create a socket for a write pipeline
* @param first the first datanode
* @param length the pipeline length
- * @param client
+ * @param client client
* @return the socket connected to the first datanode
*/
static Socket createSocketForPipeline(final DatanodeInfo first,
@@ -1313,7 +1310,7 @@ public class DFSOutputStream extends FSO
protected void checkClosed() throws IOException {
if (closed) {
- IOException e = lastException;
+ IOException e = lastException.get();
throw e != null ? e : new ClosedChannelException();
}
}
@@ -1469,6 +1466,7 @@ public class DFSOutputStream extends FSO
private void waitAndQueueCurrentPacket() throws IOException {
synchronized (dataQueue) {
+ try {
// If queue is full, then wait till we have enough space
while (!closed && dataQueue.size() + ackQueue.size() > MAX_PACKETS) {
try {
@@ -1480,13 +1478,15 @@ public class DFSOutputStream extends FSO
//
// Rather than wait around for space in the queue, we should instead try to
// return to the caller as soon as possible, even though we slightly overrun
- // the MAX_PACKETS iength.
+ // the MAX_PACKETS length.
Thread.currentThread().interrupt();
break;
}
}
checkClosed();
queueCurrentPacket();
+ } catch (ClosedChannelException e) {
+ }
}
}
@@ -1704,7 +1704,7 @@ public class DFSOutputStream extends FSO
}
}
// If 1) any new blocks were allocated since the last flush, or 2) to
- // update length in NN is requried, then persist block locations on
+ // update length in NN is required, then persist block locations on
// namenode.
if (persistBlocks.getAndSet(false) || updateLength) {
try {
@@ -1735,7 +1735,7 @@ public class DFSOutputStream extends FSO
DFSClient.LOG.warn("Error while syncing", e);
synchronized (this) {
if (!closed) {
- lastException = new IOException("IOException flush:" + e);
+ lastException.set(new IOException("IOException flush:" + e));
closeThreads(true);
}
}
@@ -1793,21 +1793,25 @@ public class DFSOutputStream extends FSO
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("Waiting for ack for: " + seqno);
}
- synchronized (dataQueue) {
- while (!closed) {
- checkClosed();
- if (lastAckedSeqno >= seqno) {
- break;
- }
- try {
- dataQueue.wait(1000); // when we receive an ack, we notify on dataQueue
- } catch (InterruptedException ie) {
- throw new InterruptedIOException(
- "Interrupted while waiting for data to be acknowledged by pipeline");
+ try {
+ synchronized (dataQueue) {
+ while (!closed) {
+ checkClosed();
+ if (lastAckedSeqno >= seqno) {
+ break;
+ }
+ try {
+ dataQueue.wait(1000); // when we receive an ack, we notify on
+ // dataQueue
+ } catch (InterruptedException ie) {
+ throw new InterruptedIOException(
+ "Interrupted while waiting for data to be acknowledged by pipeline");
+ }
}
}
+ checkClosed();
+ } catch (ClosedChannelException e) {
}
- checkClosed();
}
private synchronized void start() {
@@ -1853,7 +1857,7 @@ public class DFSOutputStream extends FSO
@Override
public synchronized void close() throws IOException {
if (closed) {
- IOException e = lastException;
+ IOException e = lastException.getAndSet(null);
if (e == null)
return;
else
@@ -1880,6 +1884,7 @@ public class DFSOutputStream extends FSO
closeThreads(false);
completeFile(lastBlock);
dfsClient.endFileLease(src);
+ } catch (ClosedChannelException e) {
} finally {
closed = true;
}
Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java Wed Oct 30 22:21:59 2013
@@ -38,6 +38,7 @@ import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.security.SecureRandom;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
@@ -599,6 +600,48 @@ public class DFSUtil {
Configuration conf) {
return getAddresses(conf, null, DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY);
}
+
+ /**
+ * Returns list of InetSocketAddress corresponding to HA NN HTTP addresses from
+ * the configuration.
+ *
+ * @param conf configuration
+ * @return list of InetSocketAddresses
+ */
+ public static Map<String, Map<String, InetSocketAddress>> getHaNnHttpAddresses(
+ Configuration conf) {
+ return getAddresses(conf, null, DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY);
+ }
+
+ /**
+ * Resolve an HDFS URL into real INetSocketAddress. It works like a DNS resolver
+ * when the URL points to an non-HA cluster. When the URL points to an HA
+ * cluster, the resolver further resolves the logical name (i.e., the authority
+ * in the URL) into real namenode addresses.
+ */
+ public static InetSocketAddress[] resolve(URI uri, int schemeDefaultPort,
+ Configuration conf) throws IOException {
+ ArrayList<InetSocketAddress> ret = new ArrayList<InetSocketAddress>();
+
+ if (!HAUtil.isLogicalUri(conf, uri)) {
+ InetSocketAddress addr = NetUtils.createSocketAddr(uri.getAuthority(),
+ schemeDefaultPort);
+ ret.add(addr);
+
+ } else {
+ Map<String, Map<String, InetSocketAddress>> addresses = DFSUtil
+ .getHaNnHttpAddresses(conf);
+
+ for (Map<String, InetSocketAddress> addrs : addresses.values()) {
+ for (InetSocketAddress addr : addrs.values()) {
+ ret.add(addr);
+ }
+ }
+ }
+
+ InetSocketAddress[] r = new InetSocketAddress[ret.size()];
+ return ret.toArray(r);
+ }
/**
* Returns list of InetSocketAddress corresponding to backup node rpc
Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java Wed Oct 30 22:21:59 2013
@@ -713,6 +713,7 @@ public class DistributedFileSystem exten
protected RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path p,
final PathFilter filter)
throws IOException {
+ final Path absF = fixRelativePart(p);
return new RemoteIterator<LocatedFileStatus>() {
private DirectoryListing thisListing;
private int i;
@@ -722,7 +723,7 @@ public class DistributedFileSystem exten
{ // initializer
// Fully resolve symlinks in path first to avoid additional resolution
// round-trips as we fetch more batches of listings
- src = getPathName(resolvePath(p));
+ src = getPathName(resolvePath(absF));
// fetch the first batch of entries in the directory
thisListing = dfs.listPaths(src, HdfsFileStatus.EMPTY_NAME, true);
statistics.incrementReadOps(1);
@@ -736,7 +737,7 @@ public class DistributedFileSystem exten
while (curStat == null && hasNextNoFilter()) {
LocatedFileStatus next =
((HdfsLocatedFileStatus)thisListing.getPartialListing()[i++])
- .makeQualifiedLocated(getUri(), p);
+ .makeQualifiedLocated(getUri(), absF);
if (filter.accept(next.getPath())) {
curStat = next;
}
Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HAUtil.java Wed Oct 30 22:21:59 2013
@@ -17,15 +17,9 @@
*/
package org.apache.hadoop.hdfs;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Map;
-
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.HadoopIllegalArgumentException;
@@ -41,11 +35,17 @@ import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
-import static org.apache.hadoop.hdfs.protocol.HdfsConstants.HA_DT_SERVICE_PREFIX;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Map;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
+import static org.apache.hadoop.hdfs.protocol.HdfsConstants.HA_DT_SERVICE_PREFIX;
public class HAUtil {
@@ -265,10 +265,15 @@ public class HAUtil {
tokenSelector.selectToken(haService, ugi.getTokens());
if (haToken != null) {
for (InetSocketAddress singleNNAddr : nnAddrs) {
+ // this is a minor hack to prevent physical HA tokens from being
+ // exposed to the user via UGI.getCredentials(), otherwise these
+ // cloned tokens may be inadvertently propagated to jobs
Token<DelegationTokenIdentifier> specificToken =
- new Token<DelegationTokenIdentifier>(haToken);
+ new Token.PrivateToken<DelegationTokenIdentifier>(haToken);
SecurityUtil.setTokenService(specificToken, singleNNAddr);
- ugi.addToken(specificToken);
+ Text alias =
+ new Text(HA_DT_SERVICE_PREFIX + "//" + specificToken.getService());
+ ugi.addToken(alias, specificToken);
LOG.debug("Mapped HA service delegation token for logical URI " +
haUri + " to namenode " + singleNNAddr);
}
Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java Wed Oct 30 22:21:59 2013
@@ -17,10 +17,18 @@
*/
package org.apache.hadoop.hdfs;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_KEY;
import java.io.IOException;
import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.HashMap;
@@ -48,6 +56,7 @@ import org.apache.hadoop.hdfs.server.pro
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.retry.DefaultFailoverProxyProvider;
import org.apache.hadoop.io.retry.FailoverProxyProvider;
+import org.apache.hadoop.io.retry.LossyRetryInvocationHandler;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryProxy;
@@ -144,6 +153,63 @@ public class NameNodeProxies {
return new ProxyAndInfo<T>(proxy, dtService);
}
}
+
+ /**
+ * Generate a dummy namenode proxy instance that utilizes our hacked
+ * {@link LossyRetryInvocationHandler}. Proxy instance generated using this
+ * method will proactively drop RPC responses. Currently this method only
+ * support HA setup. null will be returned if the given configuration is not
+ * for HA.
+ *
+ * @param config the configuration containing the required IPC
+ * properties, client failover configurations, etc.
+ * @param nameNodeUri the URI pointing either to a specific NameNode
+ * or to a logical nameservice.
+ * @param xface the IPC interface which should be created
+ * @param numResponseToDrop The number of responses to drop for each RPC call
+ * @return an object containing both the proxy and the associated
+ * delegation token service it corresponds to. Will return null of the
+ * given configuration does not support HA.
+ * @throws IOException if there is an error creating the proxy
+ */
+ @SuppressWarnings("unchecked")
+ public static <T> ProxyAndInfo<T> createProxyWithLossyRetryHandler(
+ Configuration config, URI nameNodeUri, Class<T> xface,
+ int numResponseToDrop) throws IOException {
+ Preconditions.checkArgument(numResponseToDrop > 0);
+ Class<FailoverProxyProvider<T>> failoverProxyProviderClass =
+ getFailoverProxyProviderClass(config, nameNodeUri, xface);
+ if (failoverProxyProviderClass != null) { // HA case
+ FailoverProxyProvider<T> failoverProxyProvider =
+ createFailoverProxyProvider(config, failoverProxyProviderClass,
+ xface, nameNodeUri);
+ int delay = config.getInt(
+ DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_KEY,
+ DFS_CLIENT_FAILOVER_SLEEPTIME_BASE_DEFAULT);
+ int maxCap = config.getInt(
+ DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_KEY,
+ DFS_CLIENT_FAILOVER_SLEEPTIME_MAX_DEFAULT);
+ int maxFailoverAttempts = config.getInt(
+ DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY,
+ DFS_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT);
+ InvocationHandler dummyHandler = new LossyRetryInvocationHandler<T>(
+ numResponseToDrop, failoverProxyProvider,
+ RetryPolicies.failoverOnNetworkException(
+ RetryPolicies.TRY_ONCE_THEN_FAIL,
+ Math.max(numResponseToDrop + 1, maxFailoverAttempts), delay,
+ maxCap));
+
+ T proxy = (T) Proxy.newProxyInstance(
+ failoverProxyProvider.getInterface().getClassLoader(),
+ new Class[] { xface }, dummyHandler);
+ Text dtService = HAUtil.buildTokenServiceForLogicalUri(nameNodeUri);
+ return new ProxyAndInfo<T>(proxy, dtService);
+ } else {
+ LOG.warn("Currently creating proxy using " +
+ "LossyRetryInvocationHandler requires NN HA setup");
+ return null;
+ }
+ }
/**
* Creates an explicitly non-HA-enabled proxy object. Most of the time you
Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java Wed Oct 30 22:21:59 2013
@@ -27,9 +27,12 @@ import java.nio.ByteBuffer;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.FSInputChecker;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.client.ClientMmap;
+import org.apache.hadoop.hdfs.client.ClientMmapManager;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
@@ -485,4 +488,10 @@ public class RemoteBlockReader extends F
public boolean isShortCircuit() {
return false;
}
+
+ @Override
+ public ClientMmap getClientMmap(LocatedBlock curBlock,
+ ClientMmapManager mmapManager) {
+ return null;
+ }
}
Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java Wed Oct 30 22:21:59 2013
@@ -29,9 +29,12 @@ import java.nio.channels.ReadableByteCha
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.client.ClientMmap;
+import org.apache.hadoop.hdfs.client.ClientMmapManager;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver;
@@ -40,7 +43,6 @@ import org.apache.hadoop.hdfs.protocol.p
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
-import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
@@ -451,4 +453,10 @@ public class RemoteBlockReader2 impleme
public boolean isShortCircuit() {
return false;
}
+
+ @Override
+ public ClientMmap getClientMmap(LocatedBlock curBlock,
+ ClientMmapManager manager) {
+ return null;
+ }
}
Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java Wed Oct 30 22:21:59 2013
@@ -43,6 +43,7 @@ public class DatanodeID implements Compa
private String storageID; // unique per cluster storageID
private int xferPort; // data streaming port
private int infoPort; // info server port
+ private int infoSecurePort; // info server port
private int ipcPort; // IPC server port
public DatanodeID(DatanodeID from) {
@@ -51,6 +52,7 @@ public class DatanodeID implements Compa
from.getStorageID(),
from.getXferPort(),
from.getInfoPort(),
+ from.getInfoSecurePort(),
from.getIpcPort());
this.peerHostName = from.getPeerHostName();
}
@@ -65,12 +67,13 @@ public class DatanodeID implements Compa
* @param ipcPort ipc server port
*/
public DatanodeID(String ipAddr, String hostName, String storageID,
- int xferPort, int infoPort, int ipcPort) {
+ int xferPort, int infoPort, int infoSecurePort, int ipcPort) {
this.ipAddr = ipAddr;
this.hostName = hostName;
this.storageID = storageID;
this.xferPort = xferPort;
this.infoPort = infoPort;
+ this.infoSecurePort = infoSecurePort;
this.ipcPort = ipcPort;
}
@@ -129,6 +132,13 @@ public class DatanodeID implements Compa
}
/**
+ * @return IP:infoPort string
+ */
+ public String getInfoSecureAddr() {
+ return ipAddr + ":" + infoSecurePort;
+ }
+
+ /**
* @return hostname:xferPort
*/
public String getXferAddrWithHostname() {
@@ -180,6 +190,13 @@ public class DatanodeID implements Compa
}
/**
+ * @return infoSecurePort (the port at which the HTTPS server bound to)
+ */
+ public int getInfoSecurePort() {
+ return infoSecurePort;
+ }
+
+ /**
* @return ipcPort (the port at which the IPC server bound to)
*/
public int getIpcPort() {
@@ -218,13 +235,14 @@ public class DatanodeID implements Compa
peerHostName = nodeReg.getPeerHostName();
xferPort = nodeReg.getXferPort();
infoPort = nodeReg.getInfoPort();
+ infoSecurePort = nodeReg.getInfoSecurePort();
ipcPort = nodeReg.getIpcPort();
}
/**
* Compare based on data transfer address.
*
- * @param that
+ * @param that datanode to compare with
* @return as specified by Comparable
*/
@Override
Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java Wed Oct 30 22:21:59 2013
@@ -17,10 +17,6 @@
*/
package org.apache.hadoop.hdfs.protocol;
-import static org.apache.hadoop.hdfs.DFSUtil.percent2String;
-
-import java.util.Date;
-
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -32,6 +28,10 @@ import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
+import java.util.Date;
+
+import static org.apache.hadoop.hdfs.DFSUtil.percent2String;
+
/**
* This class extends the primary identifier of a Datanode with ephemeral
* state, eg usage information, current administrative state, and the
@@ -108,18 +108,21 @@ public class DatanodeInfo extends Datano
final long capacity, final long dfsUsed, final long remaining,
final long blockPoolUsed, final long lastUpdate, final int xceiverCount,
final AdminStates adminState) {
- this(nodeID.getIpAddr(), nodeID.getHostName(), nodeID.getStorageID(), nodeID.getXferPort(),
- nodeID.getInfoPort(), nodeID.getIpcPort(), capacity, dfsUsed, remaining,
- blockPoolUsed, lastUpdate, xceiverCount, location, adminState);
+ this(nodeID.getIpAddr(), nodeID.getHostName(), nodeID.getStorageID(),
+ nodeID.getXferPort(), nodeID.getInfoPort(), nodeID.getInfoSecurePort(),
+ nodeID.getIpcPort(), capacity, dfsUsed, remaining, blockPoolUsed,
+ lastUpdate, xceiverCount, location, adminState);
}
/** Constructor */
public DatanodeInfo(final String ipAddr, final String hostName,
- final String storageID, final int xferPort, final int infoPort, final int ipcPort,
+ final String storageID, final int xferPort, final int infoPort,
+ final int infoSecurePort, final int ipcPort,
final long capacity, final long dfsUsed, final long remaining,
final long blockPoolUsed, final long lastUpdate, final int xceiverCount,
final String networkLocation, final AdminStates adminState) {
- super(ipAddr, hostName, storageID, xferPort, infoPort, ipcPort);
+ super(ipAddr, hostName, storageID, xferPort, infoPort,
+ infoSecurePort, ipcPort);
this.capacity = capacity;
this.dfsUsed = dfsUsed;
this.remaining = remaining;
Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java Wed Oct 30 22:21:59 2013
@@ -222,7 +222,8 @@ public class PBHelper {
// DatanodeId
public static DatanodeID convert(DatanodeIDProto dn) {
return new DatanodeID(dn.getIpAddr(), dn.getHostName(), dn.getStorageID(),
- dn.getXferPort(), dn.getInfoPort(), dn.getIpcPort());
+ dn.getXferPort(), dn.getInfoPort(), dn.hasInfoSecurePort() ? dn
+ .getInfoSecurePort() : 0, dn.getIpcPort());
}
public static DatanodeIDProto convert(DatanodeID dn) {
@@ -232,6 +233,7 @@ public class PBHelper {
.setStorageID(dn.getStorageID())
.setXferPort(dn.getXferPort())
.setInfoPort(dn.getInfoPort())
+ .setInfoSecurePort(dn.getInfoSecurePort())
.setIpcPort(dn.getIpcPort()).build();
}
Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenSecretManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenSecretManager.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenSecretManager.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenSecretManager.java Wed Oct 30 22:21:59 2013
@@ -37,6 +37,7 @@ import org.apache.hadoop.hdfs.server.nam
import org.apache.hadoop.hdfs.server.namenode.startupprogress.Step;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.StepType;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.RetriableException;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.SecurityUtil;
@@ -58,6 +59,15 @@ public class DelegationTokenSecretManage
.getLog(DelegationTokenSecretManager.class);
private final FSNamesystem namesystem;
+
+ public DelegationTokenSecretManager(long delegationKeyUpdateInterval,
+ long delegationTokenMaxLifetime, long delegationTokenRenewInterval,
+ long delegationTokenRemoverScanInterval, FSNamesystem namesystem) {
+ this(delegationKeyUpdateInterval, delegationTokenMaxLifetime,
+ delegationTokenRenewInterval, delegationTokenRemoverScanInterval, false,
+ namesystem);
+ }
+
/**
* Create a secret manager
* @param delegationKeyUpdateInterval the number of seconds for rolling new
@@ -67,13 +77,16 @@ public class DelegationTokenSecretManage
* @param delegationTokenRenewInterval how often the tokens must be renewed
* @param delegationTokenRemoverScanInterval how often the tokens are scanned
* for expired tokens
+ * @param storeTokenTrackingId whether to store the token's tracking id
*/
public DelegationTokenSecretManager(long delegationKeyUpdateInterval,
long delegationTokenMaxLifetime, long delegationTokenRenewInterval,
- long delegationTokenRemoverScanInterval, FSNamesystem namesystem) {
+ long delegationTokenRemoverScanInterval, boolean storeTokenTrackingId,
+ FSNamesystem namesystem) {
super(delegationKeyUpdateInterval, delegationTokenMaxLifetime,
delegationTokenRenewInterval, delegationTokenRemoverScanInterval);
this.namesystem = namesystem;
+ this.storeTokenTrackingId = storeTokenTrackingId;
}
@Override //SecretManager
@@ -103,6 +116,24 @@ public class DelegationTokenSecretManage
return super.retrievePassword(identifier);
}
+ @Override
+ public byte[] retriableRetrievePassword(DelegationTokenIdentifier identifier)
+ throws InvalidToken, StandbyException, RetriableException, IOException {
+ namesystem.checkOperation(OperationCategory.READ);
+ try {
+ return super.retrievePassword(identifier);
+ } catch (InvalidToken it) {
+ if (namesystem.inTransitionToActive()) {
+ // if the namesystem is currently in the middle of transition to
+ // active state, let client retry since the corresponding editlog may
+ // have not been applied yet
+ throw new RetriableException(it);
+ } else {
+ throw it;
+ }
+ }
+ }
+
/**
* Returns expiry time of a token given its identifier.
*
@@ -184,7 +215,7 @@ public class DelegationTokenSecretManage
}
if (currentTokens.get(identifier) == null) {
currentTokens.put(identifier, new DelegationTokenInformation(expiryTime,
- password));
+ password, getTrackingIdIfEnabled(identifier)));
} else {
throw new IOException(
"Same delegation token being added twice; invalid entry in fsimage or editlogs");
@@ -223,7 +254,7 @@ public class DelegationTokenSecretManage
byte[] password = createPassword(identifier.getBytes(), allKeys
.get(keyId).getKey());
currentTokens.put(identifier, new DelegationTokenInformation(expiryTime,
- password));
+ password, getTrackingIdIfEnabled(identifier)));
}
}
Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java Wed Oct 30 22:21:59 2013
@@ -506,7 +506,7 @@ public class Balancer {
final DatanodeInfo datanode;
final double utilization;
final long maxSize2Move;
- protected long scheduledSize = 0L;
+ private long scheduledSize = 0L;
// blocks being moved but not confirmed yet
private List<PendingBlockMove> pendingBlocks =
new ArrayList<PendingBlockMove>(MAX_NUM_CONCURRENT_MOVES);
@@ -555,20 +555,35 @@ public class Balancer {
}
/** Decide if still need to move more bytes */
- protected boolean hasSpaceForScheduling() {
+ protected synchronized boolean hasSpaceForScheduling() {
return scheduledSize<maxSize2Move;
}
/** Return the total number of bytes that need to be moved */
- protected long availableSizeToMove() {
+ protected synchronized long availableSizeToMove() {
return maxSize2Move-scheduledSize;
}
- /* increment scheduled size */
- protected void incScheduledSize(long size) {
+ /** increment scheduled size */
+ protected synchronized void incScheduledSize(long size) {
scheduledSize += size;
}
+ /** decrement scheduled size */
+ protected synchronized void decScheduledSize(long size) {
+ scheduledSize -= size;
+ }
+
+ /** get scheduled size */
+ protected synchronized long getScheduledSize(){
+ return scheduledSize;
+ }
+
+ /** get scheduled size */
+ protected synchronized void setScheduledSize(long size){
+ scheduledSize = size;
+ }
+
/* Check if the node can schedule more blocks to move */
synchronized private boolean isPendingQNotFull() {
if ( pendingBlocks.size() < MAX_NUM_CONCURRENT_MOVES ) {
@@ -702,8 +717,8 @@ public class Balancer {
pendingBlock.source = this;
pendingBlock.target = target;
if ( pendingBlock.chooseBlockAndProxy() ) {
- long blockSize = pendingBlock.block.getNumBytes();
- scheduledSize -= blockSize;
+ long blockSize = pendingBlock.block.getNumBytes();
+ decScheduledSize(blockSize);
task.size -= blockSize;
if (task.size == 0) {
tasks.remove();
@@ -747,10 +762,11 @@ public class Balancer {
private static final long MAX_ITERATION_TIME = 20*60*1000L; //20 mins
private void dispatchBlocks() {
long startTime = Time.now();
+ long scheduledSize = getScheduledSize();
this.blocksToReceive = 2*scheduledSize;
boolean isTimeUp = false;
int noPendingBlockIteration = 0;
- while(!isTimeUp && scheduledSize>0 &&
+ while(!isTimeUp && getScheduledSize()>0 &&
(!srcBlockList.isEmpty() || blocksToReceive>0)) {
PendingBlockMove pendingBlock = chooseNextBlockToMove();
if (pendingBlock != null) {
@@ -779,7 +795,7 @@ public class Balancer {
// in case no blocks can be moved for source node's task,
// jump out of while-loop after 5 iterations.
if (noPendingBlockIteration >= MAX_NO_PENDING_BLOCK_ITERATIONS) {
- scheduledSize = 0;
+ setScheduledSize(0);
}
}
@@ -992,7 +1008,7 @@ public class Balancer {
long bytesToMove = 0L;
for (Source src : sources) {
- bytesToMove += src.scheduledSize;
+ bytesToMove += src.getScheduledSize();
}
return bytesToMove;
}
@@ -1093,7 +1109,7 @@ public class Balancer {
bytesMoved += bytes;
}
- private long get() {
+ private synchronized long get() {
return bytesMoved;
}
};