You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by at...@apache.org on 2011/10/07 01:26:15 UTC
svn commit: r1179896 - in
/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs: ./
src/main/java/org/apache/hadoop/fs/ src/main/java/org/apache/hadoop/hdfs/
src/main/java/org/apache/hadoop/hdfs/protocol/
src/main/java/org/apache/hadoop/hdf...
Author: atm
Date: Thu Oct 6 23:26:14 2011
New Revision: 1179896
URL: http://svn.apache.org/viewvc?rev=1179896&view=rev
Log:
HDFS-1973. HA: HDFS clients must handle namenode failover and switch over to the new active namenode. (atm)
Added:
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientFailover.java
Modified:
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt?rev=1179896&r1=1179895&r2=1179896&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-1623.txt Thu Oct 6 23:26:14 2011
@@ -9,3 +9,5 @@ HDFS-2179. Add fencing framework and mec
HDFS-1974. Introduce active and standy states to the namenode. (suresh)
HDFS-2407. getServerDefaults and getStats don't check operation category (atm)
+
+HDFS-1973. HA: HDFS clients must handle namenode failover and switch over to the new active namenode. (atm)
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java?rev=1179896&r1=1179895&r2=1179896&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/Hdfs.java Thu Oct 6 23:26:14 2011
@@ -80,8 +80,7 @@ public class Hdfs extends AbstractFileSy
throw new IOException("Incomplete HDFS URI, no host: " + theUri);
}
- InetSocketAddress namenode = NameNode.getAddress(theUri.getAuthority());
- this.dfs = new DFSClient(namenode, conf, getStatistics());
+ this.dfs = new DFSClient(theUri, conf, getStatistics());
}
@Override
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1179896&r1=1179895&r2=1179896&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java Thu Oct 6 23:26:14 2011
@@ -1,4 +1,3 @@
-
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -26,11 +25,11 @@ import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
+import java.net.URI;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-
import javax.net.SocketFactory;
import org.apache.commons.logging.Log;
@@ -87,6 +86,9 @@ import org.apache.hadoop.io.EnumSetWrita
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.retry.FailoverProxyProvider;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
@@ -96,6 +98,7 @@ import org.apache.hadoop.security.UserGr
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.ReflectionUtils;
/********************************************************
* DFSClient can connect to a Hadoop Filesystem and
@@ -199,7 +202,7 @@ public class DFSClient implements java.i
*/
private final Map<String, DFSOutputStream> filesBeingWritten
= new HashMap<String, DFSOutputStream>();
-
+
/**
* Same as this(NameNode.getAddress(conf), conf);
* @see #DFSClient(InetSocketAddress, Configuration)
@@ -209,12 +212,16 @@ public class DFSClient implements java.i
public DFSClient(Configuration conf) throws IOException {
this(NameNode.getAddress(conf), conf);
}
+
+ public DFSClient(InetSocketAddress address, Configuration conf) throws IOException {
+ this(NameNode.getUri(address), conf);
+ }
/**
* Same as this(nameNodeAddr, conf, null);
* @see #DFSClient(InetSocketAddress, Configuration, org.apache.hadoop.fs.FileSystem.Statistics)
*/
- public DFSClient(InetSocketAddress nameNodeAddr, Configuration conf
+ public DFSClient(URI nameNodeAddr, Configuration conf
) throws IOException {
this(nameNodeAddr, conf, null);
}
@@ -223,17 +230,17 @@ public class DFSClient implements java.i
* Same as this(nameNodeAddr, null, conf, stats);
* @see #DFSClient(InetSocketAddress, ClientProtocol, Configuration, org.apache.hadoop.fs.FileSystem.Statistics)
*/
- public DFSClient(InetSocketAddress nameNodeAddr, Configuration conf,
+ public DFSClient(URI nameNodeAddr, Configuration conf,
FileSystem.Statistics stats)
throws IOException {
this(nameNodeAddr, null, conf, stats);
}
-
+
/**
* Create a new DFSClient connected to the given nameNodeAddr or rpcNamenode.
* Exactly one of nameNodeAddr or rpcNamenode must be null.
*/
- DFSClient(InetSocketAddress nameNodeAddr, ClientProtocol rpcNamenode,
+ DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode,
Configuration conf, FileSystem.Statistics stats)
throws IOException {
// Copy only the required DFSClient configuration
@@ -246,20 +253,45 @@ public class DFSClient implements java.i
// The hdfsTimeout is currently the same as the ipc timeout
this.hdfsTimeout = Client.getTimeout(conf);
this.ugi = UserGroupInformation.getCurrentUser();
- final String authority = nameNodeAddr == null? "null":
- nameNodeAddr.getHostName() + ":" + nameNodeAddr.getPort();
+
+ final String authority = nameNodeUri == null? "null": nameNodeUri.getAuthority();
this.leaserenewer = LeaseRenewer.getInstance(authority, ugi, this);
this.clientName = leaserenewer.getClientName(dfsClientConf.taskId);
+
this.socketCache = new SocketCache(dfsClientConf.socketCacheCapacity);
- if (nameNodeAddr != null && rpcNamenode == null) {
- this.namenode = DFSUtil.createNamenode(nameNodeAddr, conf);
- } else if (nameNodeAddr == null && rpcNamenode != null) {
+
+ Class<?> failoverProxyProviderClass = getFailoverProxyProviderClass(authority, conf);
+
+ if (nameNodeUri != null && failoverProxyProviderClass != null) {
+ FailoverProxyProvider failoverProxyProvider = (FailoverProxyProvider)
+ ReflectionUtils.newInstance(failoverProxyProviderClass, conf);
+ this.namenode = (ClientProtocol)RetryProxy.create(ClientProtocol.class,
+ failoverProxyProvider, RetryPolicies.failoverOnNetworkException(1));
+ } else if (nameNodeUri != null && rpcNamenode == null) {
+ this.namenode = DFSUtil.createNamenode(NameNode.getAddress(nameNodeUri), conf);
+ } else if (nameNodeUri == null && rpcNamenode != null) {
//This case is used for testing.
this.namenode = rpcNamenode;
} else {
throw new IllegalArgumentException(
"Expecting exactly one of nameNodeAddr and rpcNamenode being null: "
- + "nameNodeAddr=" + nameNodeAddr + ", rpcNamenode=" + rpcNamenode);
+ + "nameNodeAddr=" + nameNodeUri + ", rpcNamenode=" + rpcNamenode);
+ }
+ }
+
+ private Class<?> getFailoverProxyProviderClass(String authority, Configuration conf)
+ throws IOException {
+ String configKey = DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX + "." + authority;
+ try {
+ return conf.getClass(configKey, null);
+ } catch (RuntimeException e) {
+ if (e.getCause() instanceof ClassNotFoundException) {
+ throw new IOException("Could not load failover proxy provider class "
+ + conf.get(configKey) + " which is configured for authority " + authority,
+ e);
+ } else {
+ throw e;
+ }
}
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1179896&r1=1179895&r2=1179896&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Thu Oct 6 23:26:14 2011
@@ -46,6 +46,7 @@ public class DFSConfigKeys extends Commo
public static final String DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_DEFAULT = "DEFAULT";
public static final String DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY = "dfs.client.socketcache.capacity";
public static final int DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT = 16;
+ public static final String DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX = "dfs.client.failover.proxy.provider";
public static final String DFS_NAMENODE_BACKUP_ADDRESS_KEY = "dfs.namenode.backup.address";
public static final String DFS_NAMENODE_BACKUP_ADDRESS_DEFAULT = "localhost:50100";
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java?rev=1179896&r1=1179895&r2=1179896&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java Thu Oct 6 23:26:14 2011
@@ -617,15 +617,19 @@ public class DFSUtil {
}
/** Create a {@link NameNode} proxy */
- public static ClientProtocol createNamenode( InetSocketAddress nameNodeAddr,
+ public static ClientProtocol createNamenode(InetSocketAddress nameNodeAddr,
Configuration conf) throws IOException {
- return createNamenode(createRPCNamenode(nameNodeAddr, conf,
- UserGroupInformation.getCurrentUser()));
-
+ return createNamenode(nameNodeAddr, conf, UserGroupInformation.getCurrentUser());
+ }
+
+ /** Create a {@link NameNode} proxy */
+ public static ClientProtocol createNamenode(InetSocketAddress nameNodeAddr,
+ Configuration conf, UserGroupInformation ugi) throws IOException {
+ return createNamenode(createRPCNamenode(nameNodeAddr, conf, ugi));
}
/** Create a {@link NameNode} proxy */
- static ClientProtocol createRPCNamenode(InetSocketAddress nameNodeAddr,
+ public static ClientProtocol createRPCNamenode(InetSocketAddress nameNodeAddr,
Configuration conf, UserGroupInformation ugi)
throws IOException {
return (ClientProtocol)RPC.getProxy(ClientProtocol.class,
@@ -634,7 +638,7 @@ public class DFSUtil {
}
/** Create a {@link NameNode} proxy */
- static ClientProtocol createNamenode(ClientProtocol rpcNamenode)
+ public static ClientProtocol createNamenode(ClientProtocol rpcNamenode)
throws IOException {
RetryPolicy createPolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
5, HdfsConstants.LEASE_SOFTLIMIT_PERIOD, TimeUnit.MILLISECONDS);
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java?rev=1179896&r1=1179895&r2=1179896&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java Thu Oct 6 23:26:14 2011
@@ -106,8 +106,7 @@ public class DistributedFileSystem exten
throw new IOException("Incomplete HDFS URI, no host: "+ uri);
}
- InetSocketAddress namenode = NameNode.getAddress(uri.getAuthority());
- this.dfs = new DFSClient(namenode, conf, statistics);
+ this.dfs = new DFSClient(uri, conf, statistics);
this.uri = URI.create(HdfsConstants.HDFS_URI_SCHEME + "://" + uri.getAuthority());
this.workingDir = getHomeDirectory();
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java?rev=1179896&r1=1179895&r2=1179896&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java Thu Oct 6 23:26:14 2011
@@ -41,6 +41,7 @@ import org.apache.hadoop.hdfs.server.nam
import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.retry.Idempotent;
import org.apache.hadoop.ipc.VersionedProtocol;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.KerberosInfo;
@@ -99,6 +100,7 @@ public interface ClientProtocol extends
* @throws IOException If an I/O error occurred
*/
@Nullable
+ @Idempotent
public LocatedBlocks getBlockLocations(String src,
long offset,
long length)
@@ -249,7 +251,7 @@ public interface ClientProtocol extends
UnresolvedLinkException, IOException;
/**
- * The client can give up on a blcok by calling abandonBlock().
+ * The client can give up on a block by calling abandonBlock().
* The client can then
* either obtain a new block, or complete or abandon the file.
* Any partial writes to the block will be discarded.
@@ -721,6 +723,7 @@ public interface ClientProtocol extends
* @throws IOException If an I/O error occurred
*/
@Nullable
+ @Idempotent
public HdfsFileStatus getFileInfo(String src) throws AccessControlException,
FileNotFoundException, UnresolvedLinkException, IOException;
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=1179896&r1=1179895&r2=1179896&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java Thu Oct 6 23:26:14 2011
@@ -267,7 +267,7 @@ public class NameNode {
* @param filesystemURI
* @return address of file system
*/
- static InetSocketAddress getAddress(URI filesystemURI) {
+ public static InetSocketAddress getAddress(URI filesystemURI) {
String authority = filesystemURI.getAuthority();
if (authority == null) {
throw new IllegalArgumentException(String.format(
Added: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java?rev=1179896&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java (added)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java Thu Oct 6 23:26:14 2011
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode.ha;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.io.retry.FailoverProxyProvider;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * A FailoverProxyProvider implementation which allows one to configure two URIs
+ * to connect to during fail-over. The first configured address is tried first,
+ * and on a fail-over event the other address is tried.
+ */
+public class ConfiguredFailoverProxyProvider implements FailoverProxyProvider,
+ Configurable {
+
+ public static final String CONFIGURED_NAMENODE_ADDRESSES
+ = "dfs.ha.namenode.addresses";
+
+ private static final Log LOG =
+ LogFactory.getLog(ConfiguredFailoverProxyProvider.class);
+
+ private Configuration conf;
+ private int currentProxyIndex = 0;
+ private List<AddressRpcProxyPair> proxies = new ArrayList<AddressRpcProxyPair>();
+ private UserGroupInformation ugi;
+
+ @Override
+ public Class<?> getInterface() {
+ return ClientProtocol.class;
+ }
+
+ /**
+ * Lazily initialize the RPC proxy object.
+ */
+ @Override
+ public synchronized Object getProxy() {
+ AddressRpcProxyPair current = proxies.get(currentProxyIndex);
+ if (current.namenode == null) {
+ try {
+ current.namenode = DFSUtil.createRPCNamenode(current.address, conf, ugi);
+ } catch (IOException e) {
+ LOG.error("Failed to create RPC proxy to NameNode", e);
+ throw new RuntimeException(e);
+ }
+ }
+ return current.namenode;
+ }
+
+ @Override
+ public synchronized void performFailover(Object currentProxy) {
+ currentProxyIndex = (currentProxyIndex + 1) % proxies.size();
+ }
+
+ @Override
+ public synchronized Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public synchronized void setConf(Configuration conf) {
+ this.conf = conf;
+ try {
+ ugi = UserGroupInformation.getCurrentUser();
+
+ Collection<String> addresses = conf.getTrimmedStringCollection(
+ CONFIGURED_NAMENODE_ADDRESSES);
+ if (addresses == null || addresses.size() == 0) {
+ throw new RuntimeException(this.getClass().getSimpleName() +
+ " is configured but " + CONFIGURED_NAMENODE_ADDRESSES +
+ " is not set.");
+ }
+ for (String address : addresses) {
+ proxies.add(new AddressRpcProxyPair(
+ NameNode.getAddress(new URI(address).getAuthority())));
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } catch (URISyntaxException e) {
+ throw new RuntimeException("Malformed URI set in " +
+ CONFIGURED_NAMENODE_ADDRESSES, e);
+ }
+ }
+
+ /**
+ * A little pair object to store the address and connected RPC proxy object to
+ * an NN. Note that {@link AddressRpcProxyPair#namenode} may be null.
+ */
+ private static class AddressRpcProxyPair {
+ public InetSocketAddress address;
+ public ClientProtocol namenode;
+
+ public AddressRpcProxyPair(InetSocketAddress address) {
+ this.address = address;
+ }
+ }
+
+ /**
+ * Close all the proxy objects which have been opened over the lifetime of
+ * this proxy provider.
+ */
+ @Override
+ public synchronized void close() throws IOException {
+ for (AddressRpcProxyPair proxy : proxies) {
+ if (proxy.namenode != null) {
+ RPC.stopProxy(proxy.namenode);
+ }
+ }
+ }
+}
Added: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientFailover.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientFailover.java?rev=1179896&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientFailover.java (added)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientFailover.java Thu Oct 6 23:26:14 2011
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestDFSClientFailover {
+
+ private static final Path TEST_FILE = new Path("/tmp/failover-test-file");
+ private static final int FILE_LENGTH_TO_VERIFY = 100;
+
+ private Configuration conf = new Configuration();
+ private MiniDFSCluster cluster;
+
+ @Before
+ public void setUpCluster() throws IOException {
+ cluster = new MiniDFSCluster.Builder(conf).numNameNodes(2).build();
+ cluster.waitActive();
+ }
+
+ @After
+ public void tearDownCluster() throws IOException {
+ cluster.shutdown();
+ }
+
+ // TODO(HA): This test should probably be made to fail if a client fails over
+ // to talk to an NN with a different block pool id. Once failover between
+ // active/standy in a single block pool is implemented, this test should be
+ // changed to exercise that.
+ @Test
+ public void testDfsClientFailover() throws IOException, URISyntaxException {
+ final String nameServiceId = "name-service-uri";
+ InetSocketAddress nnAddr1 = cluster.getNameNode(0).getNameNodeAddress();
+ InetSocketAddress nnAddr2 = cluster.getNameNode(1).getNameNodeAddress();
+
+ ClientProtocol nn1 = DFSUtil.createNamenode(nnAddr1, conf);
+ ClientProtocol nn2 = DFSUtil.createNamenode(nnAddr2, conf);
+
+ DFSClient dfsClient1 = new DFSClient(null, nn1, conf, null);
+ DFSClient dfsClient2 = new DFSClient(null, nn2, conf, null);
+
+ OutputStream out1 = dfsClient1.create(TEST_FILE.toString(), false);
+ OutputStream out2 = dfsClient2.create(TEST_FILE.toString(), false);
+ AppendTestUtil.write(out1, 0, FILE_LENGTH_TO_VERIFY);
+ AppendTestUtil.write(out2, 0, FILE_LENGTH_TO_VERIFY);
+ out1.close();
+ out2.close();
+
+ String address1 = "hdfs://" + nnAddr1.getHostName() + ":" + nnAddr1.getPort();
+ String address2 = "hdfs://" + nnAddr2.getHostName() + ":" + nnAddr2.getPort();
+ conf.set(ConfiguredFailoverProxyProvider.CONFIGURED_NAMENODE_ADDRESSES,
+ address1 + "," + address2);
+
+ conf.set(DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX + "." + nameServiceId,
+ ConfiguredFailoverProxyProvider.class.getName());
+
+ FileSystem fs = FileSystem.get(new URI("hdfs://" + nameServiceId), conf);
+
+ AppendTestUtil.check(fs, TEST_FILE, FILE_LENGTH_TO_VERIFY);
+ cluster.getNameNode(0).stop();
+ AppendTestUtil.check(fs, TEST_FILE, FILE_LENGTH_TO_VERIFY);
+
+ fs.close();
+ }
+
+}
\ No newline at end of file