You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ta...@apache.org on 2007/10/03 23:46:36 UTC

svn commit: r581734 - in /lucene/hadoop/trunk: ./ conf/ src/java/org/apache/hadoop/dfs/ src/java/org/apache/hadoop/ipc/ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/net/ src/test/org/apache/hadoop/ipc/

Author: taton
Date: Wed Oct  3 14:46:35 2007
New Revision: 581734

URL: http://svn.apache.org/viewvc?rev=581734&view=rev
Log:
HADOOP-1822.  Allow the specialization and configuration of socket factories. Provide a StandardSocketFactory, and a SocksSocketFactory to allow the use of SOCKS proxies. (taton).


Added:
    lucene/hadoop/trunk/src/java/org/apache/hadoop/net/NetUtils.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/net/SocksSocketFactory.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/net/StandardSocketFactory.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/ipc/TestSocketFactory.java
Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/conf/hadoop-default.xml
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/SecondaryNameNode.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=581734&r1=581733&r2=581734&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Wed Oct  3 14:46:35 2007
@@ -77,6 +77,10 @@
     HADOOP-1963.  Add a FileSystem implementation for the Kosmos
     Filesystem (KFS).  (Sriram Rao via cutting)
 
+    HADOOP-1822.  Allow the specialization and configuration of socket
+    factories. Provide a StandardSocketFactory, and a SocksSocketFactory to
+    allow the use of SOCKS proxies. (taton).
+
   OPTIMIZATIONS
 
     HADOOP-1910.  Reduce the number of RPCs that DistributedFileSystem.create()

Modified: lucene/hadoop/trunk/conf/hadoop-default.xml
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/conf/hadoop-default.xml?rev=581734&r1=581733&r2=581734&view=diff
==============================================================================
--- lucene/hadoop/trunk/conf/hadoop-default.xml (original)
+++ lucene/hadoop/trunk/conf/hadoop-default.xml Wed Oct  3 14:46:35 2007
@@ -954,4 +954,39 @@
   </description>
 </property>
 
+<!-- Proxy Configuration -->
+
+<property>
+  <name>hadoop.rpc.socket.factory.class.default</name>
+  <value>org.apache.hadoop.net.StandardSocketFactory</value>
+  <description> Default SocketFactory to use. This parameter is expected to be
+    formatted as "package.FactoryClassName".
+  </description>
+</property>
+
+<property>
+  <name>hadoop.rpc.socket.factory.class.ClientProtocol</name>
+  <value></value>
+  <description> SocketFactory to use to connect to a DFS. If null or empty, use
+    hadoop.rpc.socket.class.default. This socket factory is also used by
+    DFSClient to create sockets to DataNodes.
+  </description>
+</property>
+
+<property>
+  <name>hadoop.rpc.socket.factory.class.JobSubmissionProtocol</name>
+  <value></value>
+  <description> SocketFactory to use to connect to a Map/Reduce master
+    (JobTracker). If null or empty, then use hadoop.rpc.socket.class.default.
+  </description>
+</property>
+
+<property>
+  <name>hadoop.socks.server</name>
+  <value></value>
+  <description> Address (host:port) of the SOCKS server to be used by the
+    SocksSocketFactory.
+  </description>
+</property>
+
 </configuration>

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java?rev=581734&r1=581733&r2=581734&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java Wed Oct  3 14:46:35 2007
@@ -23,6 +23,7 @@
 import org.apache.hadoop.io.retry.RetryProxy;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.ipc.*;
+import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.dfs.DistributedFileSystem.DiskStatus;
 import org.apache.hadoop.util.*;
@@ -36,6 +37,8 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.ConcurrentHashMap;
 
+import javax.net.SocketFactory;
+
 /********************************************************
  * DFSClient can connect to a Hadoop Filesystem and 
  * perform basic file tasks.  It uses the ClientProtocol
@@ -60,6 +63,7 @@
   private long defaultBlockSize;
   private short defaultReplication;
   private LocalDirAllocator dirAllocator;
+  private SocketFactory socketFactory;
     
   /**
    * A map from name -> DFSOutputStream of files that are currently being
@@ -142,7 +146,8 @@
 
     return (ClientProtocol) RetryProxy.create(ClientProtocol.class,
         RPC.getProxy(ClientProtocol.class,
-            ClientProtocol.versionID, nameNodeAddr, conf),
+            ClientProtocol.versionID, nameNodeAddr, conf,
+            NetUtils.getSocketFactory(conf, ClientProtocol.class)),
         methodNameToPolicyMap);
   }
         
@@ -152,6 +157,7 @@
   public DFSClient(InetSocketAddress nameNodeAddr, Configuration conf)
     throws IOException {
     this.conf = conf;
+    this.socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class);
     this.namenode = createNamenode(nameNodeAddr, conf);
     String taskId = conf.get("mapred.task.id");
     if (taskId != null) {
@@ -984,7 +990,7 @@
         InetSocketAddress targetAddr = retval.addr;
 
         try {
-          s = new Socket();
+          s = socketFactory.createSocket();
           s.connect(targetAddr, READ_TIMEOUT);
           s.setSoTimeout(READ_TIMEOUT);
           Block blk = targetBlock.getBlock();
@@ -1161,7 +1167,7 @@
         InetSocketAddress targetAddr = retval.addr;
             
         try {
-          dn = new Socket();
+          dn = socketFactory.createSocket();
           dn.connect(targetAddr, READ_TIMEOUT);
           dn.setSoTimeout(READ_TIMEOUT);
               
@@ -1490,7 +1496,7 @@
         //
         InetSocketAddress target = DataNode.createSocketAddr(nodes[0].getName());
         try {
-          s = new Socket();
+          s = socketFactory.createSocket();
           s.connect(target, READ_TIMEOUT);
           s.setSoTimeout(replication * READ_TIMEOUT);
         } catch (IOException ie) {

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/SecondaryNameNode.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/SecondaryNameNode.java?rev=581734&r1=581733&r2=581734&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/SecondaryNameNode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/SecondaryNameNode.java Wed Oct  3 14:46:35 2007
@@ -24,6 +24,7 @@
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.mapred.StatusHttpServer;
+import org.apache.hadoop.net.NetUtils;
 
 import java.io.*;
 import java.net.*;
@@ -93,8 +94,10 @@
     nameNodeAddr = DataNode.createSocketAddr(
                                              conf.get("fs.default.name", "local"));
     this.conf = conf;
-    this.namenode = (ClientProtocol) RPC.getProxy(ClientProtocol.class,
-                                                  ClientProtocol.versionID, nameNodeAddr, conf);
+    this.namenode =
+        (ClientProtocol) RPC.getProxy(ClientProtocol.class,
+            ClientProtocol.versionID, nameNodeAddr, conf, NetUtils
+                .getSocketFactory(conf, ClientProtocol.class));
 
     //
     // initialize the webserver for uploading files.

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java?rev=581734&r1=581733&r2=581734&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java Wed Oct  3 14:46:35 2007
@@ -31,19 +31,20 @@
 import java.io.BufferedOutputStream;
 import java.io.FilterInputStream;
 import java.io.FilterOutputStream;
-import java.io.OutputStream;
 
 import java.util.Hashtable;
 import java.util.Iterator;
 
+import javax.net.SocketFactory;
+
 import org.apache.commons.logging.*;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.dfs.FSConstants;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 
@@ -71,6 +72,7 @@
   private int maxIdleTime; //connections will be culled if it was idle for 
                            //maxIdleTime msecs
   private int maxRetries; //the max. no. of retries for socket connections
+  private SocketFactory socketFactory;           // how to create sockets
 
   /** A call waiting for a value. */
   private class Call {
@@ -146,7 +148,7 @@
       short failures = 0;
       while (true) {
         try {
-          this.socket = new Socket();
+          this.socket = socketFactory.createSocket();
           this.socket.connect(address, FSConstants.READ_TIMEOUT);
           break;
         } catch (IOException ie) { //SocketTimeoutException is also caught 
@@ -426,19 +428,29 @@
 
   /** Construct an IPC client whose values are of the given {@link Writable}
    * class. */
-  public Client(Class valueClass, Configuration conf) {
+  public Client(Class valueClass, Configuration conf, 
+      SocketFactory factory) {
     this.valueClass = valueClass;
     this.timeout = conf.getInt("ipc.client.timeout", 10000);
     this.maxIdleTime = conf.getInt("ipc.client.connection.maxidletime", 1000);
     this.maxRetries = conf.getInt("ipc.client.connect.max.retries", 10);
     this.conf = conf;
-
+    this.socketFactory = factory;
     Thread t = new ConnectionCuller();
     t.setDaemon(true);
     t.setName(valueClass.getName() + " Connection Culler");
     LOG.debug(valueClass.getName() + 
               "Connection culler maxidletime= " + maxIdleTime + "ms");
     t.start();
+  }
+
+  /**
+   * Construct an IPC client with the default SocketFactory
+   * @param valueClass
+   * @param conf
+   */
+  public Client(Class<?> valueClass, Configuration conf) {
+    this(valueClass, conf, NetUtils.getDefaultSocketFactory(conf));
   }
  
   /** Stop all threads related to this client.  No further calls may be made

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java?rev=581734&r1=581733&r2=581734&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java Wed Oct  3 14:46:35 2007
@@ -28,10 +28,15 @@
 import java.net.InetSocketAddress;
 import java.net.SocketTimeoutException;
 import java.io.*;
+import java.util.Map;
+import java.util.HashMap;
+
+import javax.net.SocketFactory;
 
 import org.apache.commons.logging.*;
 
 import org.apache.hadoop.io.*;
+import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.conf.*;
 
 /** A simple RPC mechanism.
@@ -126,37 +131,51 @@
 
   }
 
-  private static Client CLIENT;
+  private static Map<SocketFactory, Client> CLIENTS =
+      new HashMap<SocketFactory, Client>();
 
-  private static synchronized Client getClient(Configuration conf) {
+  private static synchronized Client getClient(Configuration conf,
+      SocketFactory factory) {
     // Construct & cache client.  The configuration is only used for timeout,
     // and Clients have connection pools.  So we can either (a) lose some
     // connection pooling and leak sockets, or (b) use the same timeout for all
     // configurations.  Since the IPC is usually intended globally, not
     // per-job, we choose (a).
-    if (CLIENT == null) {
-      CLIENT = new Client(ObjectWritable.class, conf);
+    Client client = CLIENTS.get(factory);
+    if (client == null) {
+      client = new Client(ObjectWritable.class, conf, factory);
+      CLIENTS.put(factory, client);
     }
-    return CLIENT;
+    return client;
+  }
+  
+  /**
+   * Construct & cache client with the default SocketFactory.
+   * @param conf
+   * @return
+   */
+  private static Client getClient(Configuration conf) {
+    return getClient(conf, SocketFactory.getDefault());
   }
 
   /**
    * Stop all RPC client connections
    */
   public static synchronized void stopClient(){
-    if (CLIENT != null) {
-      CLIENT.stop();
-      CLIENT = null;
-    }
+    for (Client client : CLIENTS.values())
+      client.stop();
+    CLIENTS.clear();
   }
 
   private static class Invoker implements InvocationHandler {
     private InetSocketAddress address;
     private Client client;
 
-    public Invoker(InetSocketAddress address, Configuration conf) {
+    public Invoker(InetSocketAddress address, Configuration conf,
+        SocketFactory factory) {
+
       this.address = address;
-      this.client = getClient(conf);
+      this.client = getClient(conf, factory);
     }
 
     public Object invoke(Object proxy, Method method, Object[] args)
@@ -239,12 +258,14 @@
   }
   /** Construct a client-side proxy object that implements the named protocol,
    * talking to a server at the named address. */
-  public static VersionedProtocol getProxy(Class protocol, long clientVersion,
-                                           InetSocketAddress addr, Configuration conf) throws IOException {
-    VersionedProtocol proxy = (VersionedProtocol) Proxy.newProxyInstance(
-                                                                         protocol.getClassLoader(),
-                                                                         new Class[] { protocol },
-                                                                         new Invoker(addr, conf));
+  public static VersionedProtocol getProxy(Class<?> protocol,
+      long clientVersion, InetSocketAddress addr, Configuration conf,
+      SocketFactory factory) throws IOException {
+
+    VersionedProtocol proxy =
+        (VersionedProtocol) Proxy.newProxyInstance(
+            protocol.getClassLoader(), new Class[] { protocol },
+            new Invoker(addr, conf, factory));
     long serverVersion = proxy.getProtocolVersion(protocol.getName(), 
                                                   clientVersion);
     if (serverVersion == clientVersion) {
@@ -253,6 +274,24 @@
       throw new VersionMismatch(protocol.getName(), clientVersion, 
                                 serverVersion);
     }
+  }
+
+  /**
+   * Construct a client-side proxy object with the default SocketFactory
+   * 
+   * @param protocol
+   * @param clientVersion
+   * @param addr
+   * @param conf
+   * @return a proxy instance
+   * @throws IOException
+   */
+  public static VersionedProtocol getProxy(Class<?> protocol,
+      long clientVersion, InetSocketAddress addr, Configuration conf)
+      throws IOException {
+
+    return getProxy(protocol, clientVersion, addr, conf, NetUtils
+        .getDefaultSocketFactory(conf));
   }
 
   /** Expert: Make multiple, parallel calls to a set of servers. */

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java?rev=581734&r1=581733&r2=581734&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java Wed Oct  3 14:46:35 2007
@@ -55,6 +55,7 @@
 import org.apache.hadoop.io.retry.RetryProxy;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.mapred.TaskInProgress;
+import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
@@ -267,9 +268,10 @@
   private JobSubmissionProtocol createProxy(InetSocketAddress addr,
                                             Configuration conf
                                             ) throws IOException {
-    JobSubmissionProtocol raw = (JobSubmissionProtocol) 
-      RPC.getProxy(JobSubmissionProtocol.class,
-                   JobSubmissionProtocol.versionID, addr, conf);
+    JobSubmissionProtocol raw =
+        (JobSubmissionProtocol) RPC.getProxy(JobSubmissionProtocol.class,
+            JobSubmissionProtocol.versionID, addr, conf, NetUtils
+                .getSocketFactory(conf, JobSubmissionProtocol.class));
     RetryPolicy backoffPolicy =
       RetryPolicies.retryUpToMaximumCountWithProportionalSleep
       (5, 10, java.util.concurrent.TimeUnit.SECONDS);

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/net/NetUtils.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/net/NetUtils.java?rev=581734&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/net/NetUtils.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/net/NetUtils.java Wed Oct  3 14:46:35 2007
@@ -0,0 +1,78 @@
+package org.apache.hadoop.net;
+
+import javax.net.SocketFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.VersionedProtocol;
+import org.apache.hadoop.util.ReflectionUtils;
+
+public class NetUtils {
+
+  /**
+   * Get the socket factory for the given class according to its
+   * configuration parameter
+   * <tt>hadoop.rpc.socket.factory.class.&lt;ClassName&gt;</tt>. When no
+   * such parameter exists then fall back on the default socket factory as
+   * configured by <tt>hadoop.rpc.socket.factory.class.default</tt>. If
+   * this default socket factory is not configured, then fall back on the JVM
+   * default socket factory.
+   * 
+   * @param conf the configuration
+   * @param clazz the class (usually a {@link VersionedProtocol})
+   * @return a socket factory
+   */
+  public static SocketFactory getSocketFactory(Configuration conf,
+      Class<?> clazz) {
+
+    SocketFactory factory = null;
+
+    String propValue =
+        conf.get("hadoop.rpc.socket.factory.class." + clazz.getSimpleName());
+    if ((propValue != null) && (propValue.length() > 0))
+      factory = getSocketFactoryFromProperty(conf, propValue);
+
+    if (factory == null)
+      factory = getDefaultSocketFactory(conf);
+
+    return factory;
+  }
+
+  /**
+   * Get the default socket factory as specified by the configuration
+   * parameter <tt>hadoop.rpc.socket.factory.default</tt>
+   * 
+   * @param conf the configuration
+   * @return the default socket factory as specified in the configuration or
+   *         the JVM default socket factory if the configuration does not
+   *         contain a default socket factory property.
+   */
+  public static SocketFactory getDefaultSocketFactory(Configuration conf) {
+
+    String propValue = conf.get("hadoop.rpc.socket.factory.class.default");
+    if ((propValue == null) || (propValue.length() == 0))
+      return SocketFactory.getDefault();
+
+    return getSocketFactoryFromProperty(conf, propValue);
+  }
+
+  /**
+   * Get the socket factory corresponding to the given proxy URI. If the
+   * given proxy URI corresponds to an absence of configuration parameter,
+   * returns null. If the URI is malformed raises an exception.
+   * 
+   * @param propValue the property which is the class name of the
+   *        SocketFactory to instantiate; assumed non null and non empty.
+   * @return a socket factory as defined in the property value.
+   */
+  public static SocketFactory getSocketFactoryFromProperty(
+      Configuration conf, String propValue) {
+
+    try {
+      Class<?> theClass = conf.getClassByName(propValue);
+      return (SocketFactory) ReflectionUtils.newInstance(theClass, conf);
+
+    } catch (ClassNotFoundException cnfe) {
+      throw new RuntimeException("Socket Factory class not found: " + cnfe);
+    }
+  }
+}

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/net/SocksSocketFactory.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/net/SocksSocketFactory.java?rev=581734&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/net/SocksSocketFactory.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/net/SocksSocketFactory.java Wed Oct  3 14:46:35 2007
@@ -0,0 +1,144 @@
+package org.apache.hadoop.net;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.Proxy;
+import java.net.Socket;
+import java.net.UnknownHostException;
+
+import javax.net.SocketFactory;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Specialized SocketFactory to create sockets with a SOCKS proxy
+ */
+public class SocksSocketFactory extends SocketFactory implements
+    Configurable {
+
+  private Configuration conf;
+
+  private Proxy proxy;
+
+  /**
+   * Default empty constructor (for use with the reflection API).
+   */
+  public SocksSocketFactory() {
+    this.proxy = Proxy.NO_PROXY;
+  }
+
+  /**
+   * Constructor with a supplied Proxy
+   * 
+   * @param proxy the proxy to use to create sockets
+   */
+  public SocksSocketFactory(Proxy proxy) {
+    this.proxy = proxy;
+  }
+
+  /* @inheritDoc */
+  @Override
+  public Socket createSocket() throws IOException {
+
+    return new Socket(proxy);
+  }
+
+  /* @inheritDoc */
+  @Override
+  public Socket createSocket(InetAddress addr, int port) throws IOException {
+
+    Socket socket = createSocket();
+    socket.connect(new InetSocketAddress(addr, port));
+    return socket;
+  }
+
+  /* @inheritDoc */
+  @Override
+  public Socket createSocket(InetAddress addr, int port,
+      InetAddress localHostAddr, int localPort) throws IOException {
+
+    Socket socket = createSocket();
+    socket.bind(new InetSocketAddress(localHostAddr, localPort));
+    socket.connect(new InetSocketAddress(addr, port));
+    return socket;
+  }
+
+  /* @inheritDoc */
+  @Override
+  public Socket createSocket(String host, int port) throws IOException,
+      UnknownHostException {
+
+    Socket socket = createSocket();
+    socket.connect(new InetSocketAddress(host, port));
+    return socket;
+  }
+
+  /* @inheritDoc */
+  @Override
+  public Socket createSocket(String host, int port,
+      InetAddress localHostAddr, int localPort) throws IOException,
+      UnknownHostException {
+
+    Socket socket = createSocket();
+    socket.bind(new InetSocketAddress(localHostAddr, localPort));
+    socket.connect(new InetSocketAddress(host, port));
+    return socket;
+  }
+
+  /* @inheritDoc */
+  @Override
+  public int hashCode() {
+    return proxy.hashCode();
+  }
+
+  /* @inheritDoc */
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (!(obj instanceof SocksSocketFactory))
+      return false;
+    final SocksSocketFactory other = (SocksSocketFactory) obj;
+    if (proxy == null) {
+      if (other.proxy != null)
+        return false;
+    } else if (!proxy.equals(other.proxy))
+      return false;
+    return true;
+  }
+
+  /* @inheritDoc */
+  public Configuration getConf() {
+    return this.conf;
+  }
+
+  /* @inheritDoc */
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+    String proxyStr = conf.get("hadoop.socks.server");
+    if ((proxyStr != null) && (proxyStr.length() > 0)) {
+      setProxy(proxyStr);
+    }
+  }
+
+  /**
+   * Set the proxy of this socket factory as described in the string
+   * parameter
+   * 
+   * @param proxyStr the proxy address using the format "host:port"
+   */
+  private void setProxy(String proxyStr) {
+    String[] strs = proxyStr.split(":", 2);
+    if (strs.length != 2)
+      throw new RuntimeException("Bad SOCKS proxy parameter: " + proxyStr);
+    String host = strs[0];
+    int port = Integer.parseInt(strs[1]);
+    this.proxy =
+        new Proxy(Proxy.Type.SOCKS, InetSocketAddress.createUnresolved(host,
+            port));
+  }
+}

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/net/StandardSocketFactory.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/net/StandardSocketFactory.java?rev=581734&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/net/StandardSocketFactory.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/net/StandardSocketFactory.java Wed Oct  3 14:46:35 2007
@@ -0,0 +1,89 @@
+package org.apache.hadoop.net;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.UnknownHostException;
+
+import javax.net.SocketFactory;
+
+/**
+ * Specialized SocketFactory to create sockets with a SOCKS proxy
+ */
+public class StandardSocketFactory extends SocketFactory {
+
+  /**
+   * Default empty constructor (for use with the reflection API).
+   */
+  public StandardSocketFactory() {
+  }
+
+  /* @inheritDoc */
+  @Override
+  public Socket createSocket() throws IOException {
+    return new Socket();
+  }
+
+  /* @inheritDoc */
+  @Override
+  public Socket createSocket(InetAddress addr, int port) throws IOException {
+
+    Socket socket = createSocket();
+    socket.connect(new InetSocketAddress(addr, port));
+    return socket;
+  }
+
+  /* @inheritDoc */
+  @Override
+  public Socket createSocket(InetAddress addr, int port,
+      InetAddress localHostAddr, int localPort) throws IOException {
+
+    Socket socket = createSocket();
+    socket.bind(new InetSocketAddress(localHostAddr, localPort));
+    socket.connect(new InetSocketAddress(addr, port));
+    return socket;
+  }
+
+  /* @inheritDoc */
+  @Override
+  public Socket createSocket(String host, int port) throws IOException,
+      UnknownHostException {
+
+    Socket socket = createSocket();
+    socket.connect(new InetSocketAddress(host, port));
+    return socket;
+  }
+
+  /* @inheritDoc */
+  @Override
+  public Socket createSocket(String host, int port,
+      InetAddress localHostAddr, int localPort) throws IOException,
+      UnknownHostException {
+
+    Socket socket = createSocket();
+    socket.bind(new InetSocketAddress(localHostAddr, localPort));
+    socket.connect(new InetSocketAddress(host, port));
+    return socket;
+  }
+
+  /* @inheritDoc */
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (!(obj instanceof StandardSocketFactory))
+      return false;
+    return true;
+  }
+
+  /* @inheritDoc */
+  @Override
+  public int hashCode() {
+    // Dummy hash code (to make find bugs happy)
+    return 47;
+  } 
+  
+}

Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/ipc/TestSocketFactory.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/ipc/TestSocketFactory.java?rev=581734&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/ipc/TestSocketFactory.java (added)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/ipc/TestSocketFactory.java Wed Oct  3 14:46:35 2007
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ipc;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketAddress;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.dfs.DistributedFileSystem;
+import org.apache.hadoop.dfs.MiniDFSCluster;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobStatus;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.net.StandardSocketFactory;
+
+/**
+ * This class checks that RPCs can use specialized socket factories.
+ */
+public class TestSocketFactory extends TestCase {
+
+  /**
+   * Check that we can reach a NameNode or a JobTracker using a specific
+   * socket factory
+   */
+  public void testSocketFactory() throws IOException {
+    // Create a standard mini-cluster
+    Configuration sconf = new Configuration();
+    MiniDFSCluster cluster = new MiniDFSCluster(sconf, 1, true, null);
+    final int nameNodePort = cluster.getNameNodePort();
+
+    // Get a reference to its DFS directly
+    FileSystem fs = cluster.getFileSystem();
+    assertTrue(fs instanceof DistributedFileSystem);
+    DistributedFileSystem directDfs = (DistributedFileSystem) fs;
+
+    // Get another reference via network using a specific socket factory
+    Configuration cconf = new Configuration();
+    cconf.set("fs.default.name", String.format("hdfs://localhost:%s/",
+        nameNodePort + 10));
+    cconf.set("hadoop.rpc.socket.factory.class.default",
+        "org.apache.hadoop.ipc.DummySocketFactory");
+    cconf.set("hadoop.rpc.socket.factory.class.ClientProtocol",
+        "org.apache.hadoop.ipc.DummySocketFactory");
+    cconf.set("hadoop.rpc.socket.factory.class.JobSubmissionProtocol",
+        "org.apache.hadoop.ipc.DummySocketFactory");
+
+    fs = FileSystem.get(cconf);
+    assertTrue(fs instanceof DistributedFileSystem);
+    DistributedFileSystem dfs = (DistributedFileSystem) fs;
+
+    JobClient client = null;
+
+    try {
+      // This will test RPC to the NameNode only.
+      // could we test Client-DataNode connections?
+      Path filePath = new Path("/dir");
+
+      assertFalse(directDfs.exists(filePath));
+      assertFalse(dfs.exists(filePath));
+
+      directDfs.mkdirs(filePath);
+      assertTrue(directDfs.exists(filePath));
+      assertTrue(dfs.exists(filePath));
+
+      // This will test TPC to a JobTracker
+      MiniMRCluster mr = new MiniMRCluster(1, fs.getUri().toString(), 1);
+      final int jobTrackerPort = mr.getJobTrackerPort();
+
+      JobConf jconf = new JobConf(cconf);
+      jconf.set("mapred.job.tracker", String.format("localhost:%d",
+          jobTrackerPort + 10));
+      client = new JobClient(jconf);
+
+      JobStatus[] jobs = client.jobsToComplete();
+      assertTrue(jobs.length == 0);
+
+    } finally {
+      try {
+        if (client != null)
+          client.close();
+      } catch (Exception ignored) {
+        // nothing we can do
+        ignored.printStackTrace();
+      }
+      try {
+        if (dfs != null)
+          dfs.close();
+
+      } catch (Exception ignored) {
+        // nothing we can do
+        ignored.printStackTrace();
+      }
+      try {
+        if (directDfs != null)
+          directDfs.close();
+
+      } catch (Exception ignored) {
+        // nothing we can do
+        ignored.printStackTrace();
+      }
+      try {
+        if (cluster != null)
+          cluster.shutdown();
+
+      } catch (Exception ignored) {
+        // nothing we can do
+        ignored.printStackTrace();
+      }
+    }
+  }
+}
+
+/**
+ * Dummy socket factory which shift TPC ports by subtracting 10 when
+ * establishing a connection
+ */
+class DummySocketFactory extends StandardSocketFactory {
+  /**
+   * Default empty constructor (for use with the reflection API).
+   */
+  public DummySocketFactory() {
+  }
+
+  /* @inheritDoc */
+  @Override
+  public Socket createSocket() throws IOException {
+    return new Socket() {
+      @Override
+      public void connect(SocketAddress addr, int timeout)
+          throws IOException {
+
+        assert (addr instanceof InetSocketAddress);
+        InetSocketAddress iaddr = (InetSocketAddress) addr;
+        SocketAddress newAddr = null;
+        if (iaddr.isUnresolved())
+          newAddr =
+              new InetSocketAddress(iaddr.getHostName(),
+                  iaddr.getPort() - 10);
+        else
+          newAddr =
+              new InetSocketAddress(iaddr.getAddress(), iaddr.getPort() - 10);
+        System.out.printf("Test socket: rerouting %s to %s\n", iaddr,
+            newAddr);
+        super.connect(newAddr, timeout);
+      }
+    };
+  }
+
+  /* @inheritDoc */
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (!(obj instanceof DummySocketFactory))
+      return false;
+    return true;
+  }
+
+  /* @inheritDoc */
+  @Override
+  public int hashCode() {
+    // Dummy hash code (to make find bugs happy)
+    return 53;
+  }
+}