You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by el...@apache.org on 2012/04/03 01:20:35 UTC

svn commit: r1308617 - in /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/ src/main/resources/ src/test/java/org/apache/hadoop/hdfs/

Author: eli
Date: Mon Apr  2 23:20:34 2012
New Revision: 1308617

URL: http://svn.apache.org/viewvc?rev=1308617&view=rev
Log:
HDFS-3148. The client should be able to use multiple local interfaces for data transfer. Contributed by Eli Collins

Modified:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1308617&r1=1308616&r2=1308617&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Mon Apr  2 23:20:34 2012
@@ -179,6 +179,9 @@ Release 2.0.0 - UNRELEASED 
 
     HDFS-3167. CLI-based driver for MiniDFSCluster. (Henry Robinson via atm)
 
+    HDFS-3148. The client should be able to use multiple local interfaces
+    for data transfer. (eli)
+
   IMPROVEMENTS
 
     HDFS-2018. Move all journal stream management code into one place.

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1308617&r1=1308616&r2=1308617&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java Mon Apr  2 23:20:34 2012
@@ -57,12 +57,16 @@ import java.io.OutputStream;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.Socket;
+import java.net.SocketAddress;
 import java.net.URI;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 
 import javax.net.SocketFactory;
 
@@ -123,6 +127,7 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.Client;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.net.DNS;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -132,7 +137,9 @@ import org.apache.hadoop.security.token.
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.Progressable;
 
+import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
+import com.google.common.net.InetAddresses;
 
 /********************************************************
  * DFSClient can connect to a Hadoop Filesystem and 
@@ -168,6 +175,8 @@ public class DFSClient implements java.i
   final LeaseRenewer leaserenewer;
   final SocketCache socketCache;
   final Conf dfsClientConf;
+  private Random r = new Random();
+  private SocketAddress[] localInterfaceAddrs;
 
   /**
    * DFSClient configuration 
@@ -361,6 +370,68 @@ public class DFSClient implements java.i
     if (LOG.isDebugEnabled()) {
       LOG.debug("Short circuit read is " + shortCircuitLocalReads);
     }
+    String localInterfaces[] =
+      conf.getTrimmedStrings(DFSConfigKeys.DFS_CLIENT_LOCAL_INTERFACES);
+    localInterfaceAddrs = getLocalInterfaceAddrs(localInterfaces);
+    if (LOG.isDebugEnabled() && 0 != localInterfaces.length) {
+      LOG.debug("Using local interfaces [" +
+      Joiner.on(',').join(localInterfaces)+ "] with addresses [" +
+      Joiner.on(',').join(localInterfaceAddrs) + "]");
+    }
+  }
+
+  /**
+   * Return the socket addresses to use with each configured
+   * local interface. Local interfaces may be specified by IP
+   * address, IP address range using CIDR notation, interface
+   * name (e.g. eth0) or sub-interface name (e.g. eth0:0).
+   * The socket addresses consist of the IPs for the interfaces
+   * and the ephemeral port (port 0). If an IP, IP range, or
+   * interface name matches an interface with sub-interfaces
+   * only the IP of the interface is used. Sub-interfaces can
+   * be used by specifying them explicitly (by IP or name).
+   * 
+   * @return SocketAddresses for the configured local interfaces,
+   *    or an empty array if none are configured
+   * @throws UnknownHostException if a given interface name is invalid
+   */
+  private static SocketAddress[] getLocalInterfaceAddrs(
+      String interfaceNames[]) throws UnknownHostException {
+    List<SocketAddress> localAddrs = new ArrayList<SocketAddress>();
+    for (String interfaceName : interfaceNames) {
+      if (InetAddresses.isInetAddress(interfaceName)) {
+        localAddrs.add(new InetSocketAddress(interfaceName, 0));
+      } else if (NetUtils.isValidSubnet(interfaceName)) {
+        for (InetAddress addr : NetUtils.getIPs(interfaceName, false)) {
+          localAddrs.add(new InetSocketAddress(addr, 0));
+        }
+      } else {
+        for (String ip : DNS.getIPs(interfaceName, false)) {
+          localAddrs.add(new InetSocketAddress(ip, 0));
+        }
+      }
+    }
+    return localAddrs.toArray(new SocketAddress[localAddrs.size()]);
+  }
+
+  /**
+   * Select one of the configured local interfaces at random. We use a random
+   * interface because other policies like round-robin are less effective
+   * given that we cache connections to datanodes.
+   *
+   * @return one of the local interface addresses at random, or null if no
+   *    local interfaces are configured
+   */
+  SocketAddress getRandomLocalInterfaceAddr() {
+    if (localInterfaceAddrs.length == 0) {
+      return null;
+    }
+    final int idx = r.nextInt(localInterfaceAddrs.length);
+    final SocketAddress addr = localInterfaceAddrs[idx];
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Using local interface " + addr);
+    }
+    return addr;
   }
 
   /**

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1308617&r1=1308616&r2=1308617&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Mon Apr  2 23:20:34 2012
@@ -197,6 +197,7 @@ public class DFSConfigKeys extends Commo
   public static final String  DFS_NAMENODE_CHECKPOINT_EDITS_DIR_KEY = "dfs.namenode.checkpoint.edits.dir";
   public static final String  DFS_HOSTS = "dfs.hosts";
   public static final String  DFS_HOSTS_EXCLUDE = "dfs.hosts.exclude";
+  public static final String  DFS_CLIENT_LOCAL_INTERFACES = "dfs.client.local.interfaces";
 
   // Much code in hdfs is not yet updated to use these keys.
   public static final String  DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY = "dfs.client.block.write.locateFollowingBlock.retries";

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java?rev=1308617&r1=1308616&r2=1308617&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java Mon Apr  2 23:20:34 2012
@@ -850,7 +850,9 @@ public class DFSInputStream extends FSIn
         // disaster.
         sock.setTcpNoDelay(true);
 
-        NetUtils.connect(sock, dnAddr, dfsClient.getConf().socketTimeout);
+        NetUtils.connect(sock, dnAddr,
+            dfsClient.getRandomLocalInterfaceAddr(),
+            dfsClient.getConf().socketTimeout);
         sock.setSoTimeout(dfsClient.getConf().socketTimeout);
       }
 

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java?rev=1308617&r1=1308616&r2=1308617&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java Mon Apr  2 23:20:34 2012
@@ -1171,7 +1171,7 @@ class DFSOutputStream extends FSOutputSu
       NetUtils.createSocketAddr(first.getXferAddr());
     final Socket sock = client.socketFactory.createSocket();
     final int timeout = client.getDatanodeReadTimeout(length);
-    NetUtils.connect(sock, isa, timeout);
+    NetUtils.connect(sock, isa, client.getRandomLocalInterfaceAddr(), timeout);
     sock.setSoTimeout(timeout);
     sock.setSendBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE);
     if(DFSClient.LOG.isDebugEnabled()) {

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml?rev=1308617&r1=1308616&r2=1308617&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml Mon Apr  2 23:20:34 2012
@@ -844,4 +844,18 @@
   </description>
 </property>
 
+<property>
+  <name>dfs.client.local.interfaces</name>
+  <value></value>
+  <description>A comma separated list of network interface names to use
+    for data transfer between the client and datanodes. When creating
+    a connection to read from or write to a datanode, the client
+    chooses one of the specified interfaces at random and binds its
+    socket to the IP of that interface. Individual names may be
+    specified as either an interface name (eg "eth0"), a subinterface
+    name (eg "eth0:0"), or an IP address (which may be specified using
+    CIDR notation to match a range of IPs).
+  </description>
+</property>
+
 </configuration>

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java?rev=1308617&r1=1308616&r2=1308617&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java Mon Apr  2 23:20:34 2012
@@ -38,6 +38,7 @@ import java.io.FileNotFoundException;
 import java.io.FileReader;
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
 import java.util.EnumSet;
 
 import org.apache.commons.logging.LogFactory;
@@ -67,6 +68,7 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.io.IOUtils;
 import org.apache.log4j.Level;
 
+import static org.junit.Assume.assumeTrue;
 
 /**
  * This class tests various cases during file creation.
@@ -140,11 +142,34 @@ public class TestFileCreation extends ju
     }
   }
 
+  public void testFileCreation() throws IOException {
+    checkFileCreation(null);
+  }
+
+  /** Same test but the client should bind to a local interface */
+  public void testFileCreationSetLocalInterface() throws IOException {
+    assumeTrue(System.getProperty("os.name").startsWith("Linux"));
+
+    // The mini cluster listens on the loopback so we can use it here
+    checkFileCreation("lo");
+
+    try {
+      checkFileCreation("bogus-interface");
+      fail("Able to specify a bogus interface");
+    } catch (UnknownHostException e) {
+      assertEquals("No such interface bogus-interface", e.getMessage());
+    }
+  }
+
   /**
    * Test if file creation and disk space consumption works right
+   * @param netIf the local interface, if any, clients should use to access DNs
    */
-  public void testFileCreation() throws IOException {
+  public void checkFileCreation(String netIf) throws IOException {
     Configuration conf = new HdfsConfiguration();
+    if (netIf != null) {
+      conf.set(DFSConfigKeys.DFS_CLIENT_LOCAL_INTERFACES, netIf);
+    }
     if (simulatedStorage) {
       SimulatedFSDataset.setFactory(conf);
     }