You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ta...@apache.org on 2007/10/03 23:46:36 UTC
svn commit: r581734 - in /lucene/hadoop/trunk: ./ conf/
src/java/org/apache/hadoop/dfs/ src/java/org/apache/hadoop/ipc/
src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/net/
src/test/org/apache/hadoop/ipc/
Author: taton
Date: Wed Oct 3 14:46:35 2007
New Revision: 581734
URL: http://svn.apache.org/viewvc?rev=581734&view=rev
Log:
HADOOP-1822. Allow the specialization and configuration of socket factories. Provide a StandardSocketFactory, and a SocksSocketFactory to allow the use of SOCKS proxies. (taton).
Added:
lucene/hadoop/trunk/src/java/org/apache/hadoop/net/NetUtils.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/net/SocksSocketFactory.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/net/StandardSocketFactory.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/ipc/TestSocketFactory.java
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/conf/hadoop-default.xml
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/SecondaryNameNode.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=581734&r1=581733&r2=581734&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Wed Oct 3 14:46:35 2007
@@ -77,6 +77,10 @@
HADOOP-1963. Add a FileSystem implementation for the Kosmos
Filesystem (KFS). (Sriram Rao via cutting)
+ HADOOP-1822. Allow the specialization and configuration of socket
+ factories. Provide a StandardSocketFactory, and a SocksSocketFactory to
+ allow the use of SOCKS proxies. (taton).
+
OPTIMIZATIONS
HADOOP-1910. Reduce the number of RPCs that DistributedFileSystem.create()
Modified: lucene/hadoop/trunk/conf/hadoop-default.xml
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/conf/hadoop-default.xml?rev=581734&r1=581733&r2=581734&view=diff
==============================================================================
--- lucene/hadoop/trunk/conf/hadoop-default.xml (original)
+++ lucene/hadoop/trunk/conf/hadoop-default.xml Wed Oct 3 14:46:35 2007
@@ -954,4 +954,39 @@
</description>
</property>
+<!-- Proxy Configuration -->
+
+<property>
+ <name>hadoop.rpc.socket.factory.class.default</name>
+ <value>org.apache.hadoop.net.StandardSocketFactory</value>
+ <description> Default SocketFactory to use. This parameter is expected to be
+ formatted as "package.FactoryClassName".
+ </description>
+</property>
+
+<property>
+ <name>hadoop.rpc.socket.factory.class.ClientProtocol</name>
+ <value></value>
+ <description> SocketFactory to use to connect to a DFS. If null or empty, use
+ hadoop.rpc.socket.class.default. This socket factory is also used by
+ DFSClient to create sockets to DataNodes.
+ </description>
+</property>
+
+<property>
+ <name>hadoop.rpc.socket.factory.class.JobSubmissionProtocol</name>
+ <value></value>
+ <description> SocketFactory to use to connect to a Map/Reduce master
+ (JobTracker). If null or empty, then use hadoop.rpc.socket.class.default.
+ </description>
+</property>
+
+<property>
+ <name>hadoop.socks.server</name>
+ <value></value>
+ <description> Address (host:port) of the SOCKS server to be used by the
+ SocksSocketFactory.
+ </description>
+</property>
+
</configuration>
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java?rev=581734&r1=581733&r2=581734&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java Wed Oct 3 14:46:35 2007
@@ -23,6 +23,7 @@
import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.ipc.*;
+import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.dfs.DistributedFileSystem.DiskStatus;
import org.apache.hadoop.util.*;
@@ -36,6 +37,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ConcurrentHashMap;
+import javax.net.SocketFactory;
+
/********************************************************
* DFSClient can connect to a Hadoop Filesystem and
* perform basic file tasks. It uses the ClientProtocol
@@ -60,6 +63,7 @@
private long defaultBlockSize;
private short defaultReplication;
private LocalDirAllocator dirAllocator;
+ private SocketFactory socketFactory;
/**
* A map from name -> DFSOutputStream of files that are currently being
@@ -142,7 +146,8 @@
return (ClientProtocol) RetryProxy.create(ClientProtocol.class,
RPC.getProxy(ClientProtocol.class,
- ClientProtocol.versionID, nameNodeAddr, conf),
+ ClientProtocol.versionID, nameNodeAddr, conf,
+ NetUtils.getSocketFactory(conf, ClientProtocol.class)),
methodNameToPolicyMap);
}
@@ -152,6 +157,7 @@
public DFSClient(InetSocketAddress nameNodeAddr, Configuration conf)
throws IOException {
this.conf = conf;
+ this.socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class);
this.namenode = createNamenode(nameNodeAddr, conf);
String taskId = conf.get("mapred.task.id");
if (taskId != null) {
@@ -984,7 +990,7 @@
InetSocketAddress targetAddr = retval.addr;
try {
- s = new Socket();
+ s = socketFactory.createSocket();
s.connect(targetAddr, READ_TIMEOUT);
s.setSoTimeout(READ_TIMEOUT);
Block blk = targetBlock.getBlock();
@@ -1161,7 +1167,7 @@
InetSocketAddress targetAddr = retval.addr;
try {
- dn = new Socket();
+ dn = socketFactory.createSocket();
dn.connect(targetAddr, READ_TIMEOUT);
dn.setSoTimeout(READ_TIMEOUT);
@@ -1490,7 +1496,7 @@
//
InetSocketAddress target = DataNode.createSocketAddr(nodes[0].getName());
try {
- s = new Socket();
+ s = socketFactory.createSocket();
s.connect(target, READ_TIMEOUT);
s.setSoTimeout(replication * READ_TIMEOUT);
} catch (IOException ie) {
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/SecondaryNameNode.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/SecondaryNameNode.java?rev=581734&r1=581733&r2=581734&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/SecondaryNameNode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/SecondaryNameNode.java Wed Oct 3 14:46:35 2007
@@ -24,6 +24,7 @@
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.mapred.StatusHttpServer;
+import org.apache.hadoop.net.NetUtils;
import java.io.*;
import java.net.*;
@@ -93,8 +94,10 @@
nameNodeAddr = DataNode.createSocketAddr(
conf.get("fs.default.name", "local"));
this.conf = conf;
- this.namenode = (ClientProtocol) RPC.getProxy(ClientProtocol.class,
- ClientProtocol.versionID, nameNodeAddr, conf);
+ this.namenode =
+ (ClientProtocol) RPC.getProxy(ClientProtocol.class,
+ ClientProtocol.versionID, nameNodeAddr, conf, NetUtils
+ .getSocketFactory(conf, ClientProtocol.class));
//
// initialize the webserver for uploading files.
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java?rev=581734&r1=581733&r2=581734&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Client.java Wed Oct 3 14:46:35 2007
@@ -31,19 +31,20 @@
import java.io.BufferedOutputStream;
import java.io.FilterInputStream;
import java.io.FilterOutputStream;
-import java.io.OutputStream;
import java.util.Hashtable;
import java.util.Iterator;
+import javax.net.SocketFactory;
+
import org.apache.commons.logging.*;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.dfs.FSConstants;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
@@ -71,6 +72,7 @@
private int maxIdleTime; //connections will be culled if it was idle for
//maxIdleTime msecs
private int maxRetries; //the max. no. of retries for socket connections
+ private SocketFactory socketFactory; // how to create sockets
/** A call waiting for a value. */
private class Call {
@@ -146,7 +148,7 @@
short failures = 0;
while (true) {
try {
- this.socket = new Socket();
+ this.socket = socketFactory.createSocket();
this.socket.connect(address, FSConstants.READ_TIMEOUT);
break;
} catch (IOException ie) { //SocketTimeoutException is also caught
@@ -426,19 +428,29 @@
/** Construct an IPC client whose values are of the given {@link Writable}
* class. */
- public Client(Class valueClass, Configuration conf) {
+ public Client(Class valueClass, Configuration conf,
+ SocketFactory factory) {
this.valueClass = valueClass;
this.timeout = conf.getInt("ipc.client.timeout", 10000);
this.maxIdleTime = conf.getInt("ipc.client.connection.maxidletime", 1000);
this.maxRetries = conf.getInt("ipc.client.connect.max.retries", 10);
this.conf = conf;
-
+ this.socketFactory = factory;
Thread t = new ConnectionCuller();
t.setDaemon(true);
t.setName(valueClass.getName() + " Connection Culler");
LOG.debug(valueClass.getName() +
"Connection culler maxidletime= " + maxIdleTime + "ms");
t.start();
+ }
+
+ /**
+ * Construct an IPC client with the default SocketFactory
+ * @param valueClass
+ * @param conf
+ */
+ public Client(Class<?> valueClass, Configuration conf) {
+ this(valueClass, conf, NetUtils.getDefaultSocketFactory(conf));
}
/** Stop all threads related to this client. No further calls may be made
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java?rev=581734&r1=581733&r2=581734&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/RPC.java Wed Oct 3 14:46:35 2007
@@ -28,10 +28,15 @@
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.io.*;
+import java.util.Map;
+import java.util.HashMap;
+
+import javax.net.SocketFactory;
import org.apache.commons.logging.*;
import org.apache.hadoop.io.*;
+import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.conf.*;
/** A simple RPC mechanism.
@@ -126,37 +131,51 @@
}
- private static Client CLIENT;
+ private static Map<SocketFactory, Client> CLIENTS =
+ new HashMap<SocketFactory, Client>();
- private static synchronized Client getClient(Configuration conf) {
+ private static synchronized Client getClient(Configuration conf,
+ SocketFactory factory) {
// Construct & cache client. The configuration is only used for timeout,
// and Clients have connection pools. So we can either (a) lose some
// connection pooling and leak sockets, or (b) use the same timeout for all
// configurations. Since the IPC is usually intended globally, not
// per-job, we choose (a).
- if (CLIENT == null) {
- CLIENT = new Client(ObjectWritable.class, conf);
+ Client client = CLIENTS.get(factory);
+ if (client == null) {
+ client = new Client(ObjectWritable.class, conf, factory);
+ CLIENTS.put(factory, client);
}
- return CLIENT;
+ return client;
+ }
+
+ /**
+ * Construct & cache client with the default SocketFactory.
+ * @param conf
+ * @return
+ */
+ private static Client getClient(Configuration conf) {
+ return getClient(conf, SocketFactory.getDefault());
}
/**
* Stop all RPC client connections
*/
public static synchronized void stopClient(){
- if (CLIENT != null) {
- CLIENT.stop();
- CLIENT = null;
- }
+ for (Client client : CLIENTS.values())
+ client.stop();
+ CLIENTS.clear();
}
private static class Invoker implements InvocationHandler {
private InetSocketAddress address;
private Client client;
- public Invoker(InetSocketAddress address, Configuration conf) {
+ public Invoker(InetSocketAddress address, Configuration conf,
+ SocketFactory factory) {
+
this.address = address;
- this.client = getClient(conf);
+ this.client = getClient(conf, factory);
}
public Object invoke(Object proxy, Method method, Object[] args)
@@ -239,12 +258,14 @@
}
/** Construct a client-side proxy object that implements the named protocol,
* talking to a server at the named address. */
- public static VersionedProtocol getProxy(Class protocol, long clientVersion,
- InetSocketAddress addr, Configuration conf) throws IOException {
- VersionedProtocol proxy = (VersionedProtocol) Proxy.newProxyInstance(
- protocol.getClassLoader(),
- new Class[] { protocol },
- new Invoker(addr, conf));
+ public static VersionedProtocol getProxy(Class<?> protocol,
+ long clientVersion, InetSocketAddress addr, Configuration conf,
+ SocketFactory factory) throws IOException {
+
+ VersionedProtocol proxy =
+ (VersionedProtocol) Proxy.newProxyInstance(
+ protocol.getClassLoader(), new Class[] { protocol },
+ new Invoker(addr, conf, factory));
long serverVersion = proxy.getProtocolVersion(protocol.getName(),
clientVersion);
if (serverVersion == clientVersion) {
@@ -253,6 +274,24 @@
throw new VersionMismatch(protocol.getName(), clientVersion,
serverVersion);
}
+ }
+
+ /**
+ * Construct a client-side proxy object with the default SocketFactory
+ *
+ * @param protocol
+ * @param clientVersion
+ * @param addr
+ * @param conf
+ * @return a proxy instance
+ * @throws IOException
+ */
+ public static VersionedProtocol getProxy(Class<?> protocol,
+ long clientVersion, InetSocketAddress addr, Configuration conf)
+ throws IOException {
+
+ return getProxy(protocol, clientVersion, addr, conf, NetUtils
+ .getDefaultSocketFactory(conf));
}
/** Expert: Make multiple, parallel calls to a set of servers. */
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java?rev=581734&r1=581733&r2=581734&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java Wed Oct 3 14:46:35 2007
@@ -55,6 +55,7 @@
import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.mapred.TaskInProgress;
+import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
@@ -267,9 +268,10 @@
private JobSubmissionProtocol createProxy(InetSocketAddress addr,
Configuration conf
) throws IOException {
- JobSubmissionProtocol raw = (JobSubmissionProtocol)
- RPC.getProxy(JobSubmissionProtocol.class,
- JobSubmissionProtocol.versionID, addr, conf);
+ JobSubmissionProtocol raw =
+ (JobSubmissionProtocol) RPC.getProxy(JobSubmissionProtocol.class,
+ JobSubmissionProtocol.versionID, addr, conf, NetUtils
+ .getSocketFactory(conf, JobSubmissionProtocol.class));
RetryPolicy backoffPolicy =
RetryPolicies.retryUpToMaximumCountWithProportionalSleep
(5, 10, java.util.concurrent.TimeUnit.SECONDS);
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/net/NetUtils.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/net/NetUtils.java?rev=581734&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/net/NetUtils.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/net/NetUtils.java Wed Oct 3 14:46:35 2007
@@ -0,0 +1,78 @@
+package org.apache.hadoop.net;
+
+import javax.net.SocketFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.VersionedProtocol;
+import org.apache.hadoop.util.ReflectionUtils;
+
+public class NetUtils {
+
+ /**
+ * Get the socket factory for the given class according to its
+ * configuration parameter
+ * <tt>hadoop.rpc.socket.factory.class.<ClassName></tt>. When no
+ * such parameter exists then fall back on the default socket factory as
+ * configured by <tt>hadoop.rpc.socket.factory.class.default</tt>. If
+ * this default socket factory is not configured, then fall back on the JVM
+ * default socket factory.
+ *
+ * @param conf the configuration
+ * @param clazz the class (usually a {@link VersionedProtocol})
+ * @return a socket factory
+ */
+ public static SocketFactory getSocketFactory(Configuration conf,
+ Class<?> clazz) {
+
+ SocketFactory factory = null;
+
+ String propValue =
+ conf.get("hadoop.rpc.socket.factory.class." + clazz.getSimpleName());
+ if ((propValue != null) && (propValue.length() > 0))
+ factory = getSocketFactoryFromProperty(conf, propValue);
+
+ if (factory == null)
+ factory = getDefaultSocketFactory(conf);
+
+ return factory;
+ }
+
+ /**
+ * Get the default socket factory as specified by the configuration
+ * parameter <tt>hadoop.rpc.socket.factory.default</tt>
+ *
+ * @param conf the configuration
+ * @return the default socket factory as specified in the configuration or
+ * the JVM default socket factory if the configuration does not
+ * contain a default socket factory property.
+ */
+ public static SocketFactory getDefaultSocketFactory(Configuration conf) {
+
+ String propValue = conf.get("hadoop.rpc.socket.factory.class.default");
+ if ((propValue == null) || (propValue.length() == 0))
+ return SocketFactory.getDefault();
+
+ return getSocketFactoryFromProperty(conf, propValue);
+ }
+
+ /**
+ * Get the socket factory corresponding to the given proxy URI. If the
+ * given proxy URI corresponds to an absence of configuration parameter,
+ * returns null. If the URI is malformed raises an exception.
+ *
+ * @param propValue the property which is the class name of the
+ * SocketFactory to instantiate; assumed non null and non empty.
+ * @return a socket factory as defined in the property value.
+ */
+ public static SocketFactory getSocketFactoryFromProperty(
+ Configuration conf, String propValue) {
+
+ try {
+ Class<?> theClass = conf.getClassByName(propValue);
+ return (SocketFactory) ReflectionUtils.newInstance(theClass, conf);
+
+ } catch (ClassNotFoundException cnfe) {
+ throw new RuntimeException("Socket Factory class not found: " + cnfe);
+ }
+ }
+}
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/net/SocksSocketFactory.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/net/SocksSocketFactory.java?rev=581734&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/net/SocksSocketFactory.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/net/SocksSocketFactory.java Wed Oct 3 14:46:35 2007
@@ -0,0 +1,144 @@
+package org.apache.hadoop.net;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.Proxy;
+import java.net.Socket;
+import java.net.UnknownHostException;
+
+import javax.net.SocketFactory;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Specialized SocketFactory to create sockets with a SOCKS proxy
+ */
+public class SocksSocketFactory extends SocketFactory implements
+ Configurable {
+
+ private Configuration conf;
+
+ private Proxy proxy;
+
+ /**
+ * Default empty constructor (for use with the reflection API).
+ */
+ public SocksSocketFactory() {
+ this.proxy = Proxy.NO_PROXY;
+ }
+
+ /**
+ * Constructor with a supplied Proxy
+ *
+ * @param proxy the proxy to use to create sockets
+ */
+ public SocksSocketFactory(Proxy proxy) {
+ this.proxy = proxy;
+ }
+
+ /* @inheritDoc */
+ @Override
+ public Socket createSocket() throws IOException {
+
+ return new Socket(proxy);
+ }
+
+ /* @inheritDoc */
+ @Override
+ public Socket createSocket(InetAddress addr, int port) throws IOException {
+
+ Socket socket = createSocket();
+ socket.connect(new InetSocketAddress(addr, port));
+ return socket;
+ }
+
+ /* @inheritDoc */
+ @Override
+ public Socket createSocket(InetAddress addr, int port,
+ InetAddress localHostAddr, int localPort) throws IOException {
+
+ Socket socket = createSocket();
+ socket.bind(new InetSocketAddress(localHostAddr, localPort));
+ socket.connect(new InetSocketAddress(addr, port));
+ return socket;
+ }
+
+ /* @inheritDoc */
+ @Override
+ public Socket createSocket(String host, int port) throws IOException,
+ UnknownHostException {
+
+ Socket socket = createSocket();
+ socket.connect(new InetSocketAddress(host, port));
+ return socket;
+ }
+
+ /* @inheritDoc */
+ @Override
+ public Socket createSocket(String host, int port,
+ InetAddress localHostAddr, int localPort) throws IOException,
+ UnknownHostException {
+
+ Socket socket = createSocket();
+ socket.bind(new InetSocketAddress(localHostAddr, localPort));
+ socket.connect(new InetSocketAddress(host, port));
+ return socket;
+ }
+
+ /* @inheritDoc */
+ @Override
+ public int hashCode() {
+ return proxy.hashCode();
+ }
+
+ /* @inheritDoc */
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (!(obj instanceof SocksSocketFactory))
+ return false;
+ final SocksSocketFactory other = (SocksSocketFactory) obj;
+ if (proxy == null) {
+ if (other.proxy != null)
+ return false;
+ } else if (!proxy.equals(other.proxy))
+ return false;
+ return true;
+ }
+
+ /* @inheritDoc */
+ public Configuration getConf() {
+ return this.conf;
+ }
+
+ /* @inheritDoc */
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ String proxyStr = conf.get("hadoop.socks.server");
+ if ((proxyStr != null) && (proxyStr.length() > 0)) {
+ setProxy(proxyStr);
+ }
+ }
+
+ /**
+ * Set the proxy of this socket factory as described in the string
+ * parameter
+ *
+ * @param proxyStr the proxy address using the format "host:port"
+ */
+ private void setProxy(String proxyStr) {
+ String[] strs = proxyStr.split(":", 2);
+ if (strs.length != 2)
+ throw new RuntimeException("Bad SOCKS proxy parameter: " + proxyStr);
+ String host = strs[0];
+ int port = Integer.parseInt(strs[1]);
+ this.proxy =
+ new Proxy(Proxy.Type.SOCKS, InetSocketAddress.createUnresolved(host,
+ port));
+ }
+}
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/net/StandardSocketFactory.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/net/StandardSocketFactory.java?rev=581734&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/net/StandardSocketFactory.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/net/StandardSocketFactory.java Wed Oct 3 14:46:35 2007
@@ -0,0 +1,89 @@
+package org.apache.hadoop.net;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.UnknownHostException;
+
+import javax.net.SocketFactory;
+
+/**
+ * Specialized SocketFactory to create sockets with a SOCKS proxy
+ */
+public class StandardSocketFactory extends SocketFactory {
+
+ /**
+ * Default empty constructor (for use with the reflection API).
+ */
+ public StandardSocketFactory() {
+ }
+
+ /* @inheritDoc */
+ @Override
+ public Socket createSocket() throws IOException {
+ return new Socket();
+ }
+
+ /* @inheritDoc */
+ @Override
+ public Socket createSocket(InetAddress addr, int port) throws IOException {
+
+ Socket socket = createSocket();
+ socket.connect(new InetSocketAddress(addr, port));
+ return socket;
+ }
+
+ /* @inheritDoc */
+ @Override
+ public Socket createSocket(InetAddress addr, int port,
+ InetAddress localHostAddr, int localPort) throws IOException {
+
+ Socket socket = createSocket();
+ socket.bind(new InetSocketAddress(localHostAddr, localPort));
+ socket.connect(new InetSocketAddress(addr, port));
+ return socket;
+ }
+
+ /* @inheritDoc */
+ @Override
+ public Socket createSocket(String host, int port) throws IOException,
+ UnknownHostException {
+
+ Socket socket = createSocket();
+ socket.connect(new InetSocketAddress(host, port));
+ return socket;
+ }
+
+ /* @inheritDoc */
+ @Override
+ public Socket createSocket(String host, int port,
+ InetAddress localHostAddr, int localPort) throws IOException,
+ UnknownHostException {
+
+ Socket socket = createSocket();
+ socket.bind(new InetSocketAddress(localHostAddr, localPort));
+ socket.connect(new InetSocketAddress(host, port));
+ return socket;
+ }
+
+ /* @inheritDoc */
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (!(obj instanceof StandardSocketFactory))
+ return false;
+ return true;
+ }
+
+ /* @inheritDoc */
+ @Override
+ public int hashCode() {
+ // Dummy hash code (to make find bugs happy)
+ return 47;
+ }
+
+}
Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/ipc/TestSocketFactory.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/ipc/TestSocketFactory.java?rev=581734&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/ipc/TestSocketFactory.java (added)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/ipc/TestSocketFactory.java Wed Oct 3 14:46:35 2007
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ipc;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketAddress;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.dfs.DistributedFileSystem;
+import org.apache.hadoop.dfs.MiniDFSCluster;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobStatus;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.net.StandardSocketFactory;
+
+/**
+ * This class checks that RPCs can use specialized socket factories.
+ */
+public class TestSocketFactory extends TestCase {
+
+ /**
+ * Check that we can reach a NameNode or a JobTracker using a specific
+ * socket factory
+ */
+ public void testSocketFactory() throws IOException {
+ // Create a standard mini-cluster
+ Configuration sconf = new Configuration();
+ MiniDFSCluster cluster = new MiniDFSCluster(sconf, 1, true, null);
+ final int nameNodePort = cluster.getNameNodePort();
+
+ // Get a reference to its DFS directly
+ FileSystem fs = cluster.getFileSystem();
+ assertTrue(fs instanceof DistributedFileSystem);
+ DistributedFileSystem directDfs = (DistributedFileSystem) fs;
+
+ // Get another reference via network using a specific socket factory
+ Configuration cconf = new Configuration();
+ cconf.set("fs.default.name", String.format("hdfs://localhost:%s/",
+ nameNodePort + 10));
+ cconf.set("hadoop.rpc.socket.factory.class.default",
+ "org.apache.hadoop.ipc.DummySocketFactory");
+ cconf.set("hadoop.rpc.socket.factory.class.ClientProtocol",
+ "org.apache.hadoop.ipc.DummySocketFactory");
+ cconf.set("hadoop.rpc.socket.factory.class.JobSubmissionProtocol",
+ "org.apache.hadoop.ipc.DummySocketFactory");
+
+ fs = FileSystem.get(cconf);
+ assertTrue(fs instanceof DistributedFileSystem);
+ DistributedFileSystem dfs = (DistributedFileSystem) fs;
+
+ JobClient client = null;
+
+ try {
+ // This will test RPC to the NameNode only.
+ // could we test Client-DataNode connections?
+ Path filePath = new Path("/dir");
+
+ assertFalse(directDfs.exists(filePath));
+ assertFalse(dfs.exists(filePath));
+
+ directDfs.mkdirs(filePath);
+ assertTrue(directDfs.exists(filePath));
+ assertTrue(dfs.exists(filePath));
+
+ // This will test TPC to a JobTracker
+ MiniMRCluster mr = new MiniMRCluster(1, fs.getUri().toString(), 1);
+ final int jobTrackerPort = mr.getJobTrackerPort();
+
+ JobConf jconf = new JobConf(cconf);
+ jconf.set("mapred.job.tracker", String.format("localhost:%d",
+ jobTrackerPort + 10));
+ client = new JobClient(jconf);
+
+ JobStatus[] jobs = client.jobsToComplete();
+ assertTrue(jobs.length == 0);
+
+ } finally {
+ try {
+ if (client != null)
+ client.close();
+ } catch (Exception ignored) {
+ // nothing we can do
+ ignored.printStackTrace();
+ }
+ try {
+ if (dfs != null)
+ dfs.close();
+
+ } catch (Exception ignored) {
+ // nothing we can do
+ ignored.printStackTrace();
+ }
+ try {
+ if (directDfs != null)
+ directDfs.close();
+
+ } catch (Exception ignored) {
+ // nothing we can do
+ ignored.printStackTrace();
+ }
+ try {
+ if (cluster != null)
+ cluster.shutdown();
+
+ } catch (Exception ignored) {
+ // nothing we can do
+ ignored.printStackTrace();
+ }
+ }
+ }
+}
+
+/**
+ * Dummy socket factory which shift TPC ports by subtracting 10 when
+ * establishing a connection
+ */
+class DummySocketFactory extends StandardSocketFactory {
+ /**
+ * Default empty constructor (for use with the reflection API).
+ */
+ public DummySocketFactory() {
+ }
+
+ /* @inheritDoc */
+ @Override
+ public Socket createSocket() throws IOException {
+ return new Socket() {
+ @Override
+ public void connect(SocketAddress addr, int timeout)
+ throws IOException {
+
+ assert (addr instanceof InetSocketAddress);
+ InetSocketAddress iaddr = (InetSocketAddress) addr;
+ SocketAddress newAddr = null;
+ if (iaddr.isUnresolved())
+ newAddr =
+ new InetSocketAddress(iaddr.getHostName(),
+ iaddr.getPort() - 10);
+ else
+ newAddr =
+ new InetSocketAddress(iaddr.getAddress(), iaddr.getPort() - 10);
+ System.out.printf("Test socket: rerouting %s to %s\n", iaddr,
+ newAddr);
+ super.connect(newAddr, timeout);
+ }
+ };
+ }
+
+ /* @inheritDoc */
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (!(obj instanceof DummySocketFactory))
+ return false;
+ return true;
+ }
+
+ /* @inheritDoc */
+ @Override
+ public int hashCode() {
+ // Dummy hash code (to make find bugs happy)
+ return 53;
+ }
+}