You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 05:24:40 UTC
svn commit: r1077528 - in
/hadoop/common/branches/branch-0.20-security-patches/src:
core/org/apache/hadoop/fs/CommonConfigurationKeys.java
core/org/apache/hadoop/ipc/Server.java
test/org/apache/hadoop/ipc/TestRPC.java
Author: omalley
Date: Fri Mar 4 04:24:40 2011
New Revision: 1077528
URL: http://svn.apache.org/viewvc?rev=1077528&view=rev
Log:
commit 048d12257c35102e92c89164906fe6af21161c0c
Author: Bharath Mundlapudi <bh...@yahoo-inc.com>
Date: Fri Jul 2 17:37:37 2010 -0700
HADOOP:6713 from https://issues.apache.org/jira/secure/attachment/12448613/HADOOP-6713-rel20.3.patch
+++ b/YAHOO-CHANGES.txt
+ HADOOP-6713. Multiple RPC Reader Threads (Bharathm)
+
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/fs/CommonConfigurationKeys.java
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/Server.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/ipc/TestRPC.java
Modified: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/fs/CommonConfigurationKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/fs/CommonConfigurationKeys.java?rev=1077528&r1=1077527&r2=1077528&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/fs/CommonConfigurationKeys.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/fs/CommonConfigurationKeys.java Fri Mar 4 04:24:40 2011
@@ -42,5 +42,9 @@ public class CommonConfigurationKeys {
/** See src/core/core-default.xml */
public static final String HADOOP_SECURITY_SERVICE_USER_NAME_KEY =
"hadoop.security.service.user.name.key";
+ public static final String IPC_SERVER_RPC_READ_THREADS_KEY =
+ "ipc.server.read.threadpool.size";
+ public static final int IPC_SERVER_RPC_READ_THREADS_DEFAULT = 1;
+
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/Server.java?rev=1077528&r1=1077527&r2=1077528&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/Server.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/Server.java Fri Mar 4 04:24:40 2011
@@ -51,6 +51,8 @@ import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import javax.security.sasl.Sasl;
import javax.security.sasl.SaslException;
@@ -173,6 +175,7 @@ public abstract class Server {
private String bindAddress;
private int port; // port we listen on
private int handlerCount; // number of handler threads
+ private int readThreads; // number of read threads
private Class<? extends Writable> paramClass; // class of call parameters
private int maxIdleTime; // the maximum idle time after
// which a client may be disconnected
@@ -278,6 +281,8 @@ public abstract class Server {
private ServerSocketChannel acceptChannel = null; //the accept channel
private Selector selector = null; //the selector that we use for the server
+ private Reader[] readers = null;
+ private int currentReader = 0;
private InetSocketAddress address; //the address we bind at
private Random rand = new Random();
private long lastCleanupRunTime = 0; //the last time when a cleanup connec-
@@ -285,7 +290,8 @@ public abstract class Server {
private long cleanupInterval = 10000; //the minimum interval between
//two cleanup runs
private int backlogLength = conf.getInt("ipc.server.listen.queue.size", 128);
-
+ private ExecutorService readPool;
+
public Listener() throws IOException {
address = new InetSocketAddress(bindAddress, port);
// Create a new server socket and set to non blocking mode
@@ -297,12 +303,85 @@ public abstract class Server {
port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
// create a selector;
selector= Selector.open();
+ readers = new Reader[readThreads];
+ readPool = Executors.newFixedThreadPool(readThreads);
+ for (int i = 0; i < readThreads; i++) {
+ Selector readSelector = Selector.open();
+ Reader reader = new Reader(readSelector);
+ readers[i] = reader;
+ readPool.execute(reader);
+ }
// Register accepts on the server socket with the selector.
acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
this.setName("IPC Server listener on " + port);
this.setDaemon(true);
}
+
+ private class Reader implements Runnable {
+ private volatile boolean adding = false;
+ private Selector readSelector = null;
+
+ Reader(Selector readSelector) {
+ this.readSelector = readSelector;
+ }
+ public void run() {
+ LOG.info("Starting SocketReader");
+ synchronized (this) {
+ while (running) {
+ SelectionKey key = null;
+ try {
+ readSelector.select();
+ while (adding) {
+ this.wait(1000);
+ }
+
+ Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
+ while (iter.hasNext()) {
+ key = iter.next();
+ iter.remove();
+ if (key.isValid()) {
+ if (key.isReadable()) {
+ doRead(key);
+ }
+ }
+ key = null;
+ }
+ } catch (InterruptedException e) {
+ if (running) { // unexpected -- log it
+ LOG.info(getName() + " caught: " +
+ StringUtils.stringifyException(e));
+ }
+ } catch (IOException ex) {
+ LOG.error("Error in Reader", ex);
+ }
+ }
+ }
+ }
+
+ /**
+ * This gets reader into the state that waits for the new channel
+ * to be registered with readSelector. If it was waiting in select()
+ * the thread will be woken up, otherwise whenever select() is called
+ * it will return even if there is nothing to read and wait
+ * in while(adding) for finishAdd call
+ */
+ public void startAdd() {
+ adding = true;
+ readSelector.wakeup();
+ }
+
+ public synchronized SelectionKey registerChannel(SocketChannel channel)
+ throws IOException {
+ return channel.register(readSelector, SelectionKey.OP_READ);
+ }
+
+ public synchronized void finishAdd() {
+ adding = false;
+ this.notify();
+ }
+ }
+
/** cleanup connections from connectionList. Choose a random range
* to scan and also have a limit on the number of the connections
* that will be cleanedup per run. The criteria for cleanup is the time
@@ -367,8 +446,6 @@ public abstract class Server {
if (key.isValid()) {
if (key.isAcceptable())
doAccept(key);
- else if (key.isReadable())
- doRead(key);
}
} catch (IOException e) {
}
@@ -382,11 +459,6 @@ public abstract class Server {
closeCurrentConnection(key, e);
cleanupConnections(true);
try { Thread.sleep(60000); } catch (Exception ie) {}
- } catch (InterruptedException e) {
- if (running) { // unexpected -- log it
- LOG.info(getName() + " caught: " +
- StringUtils.stringifyException(e));
- }
} catch (Exception e) {
closeCurrentConnection(key, e);
}
@@ -429,24 +501,28 @@ public abstract class Server {
void doAccept(SelectionKey key) throws IOException, OutOfMemoryError {
Connection c = null;
ServerSocketChannel server = (ServerSocketChannel) key.channel();
- // accept up to 10 connections
- for (int i=0; i<10; i++) {
- SocketChannel channel = server.accept();
- if (channel==null) return;
-
+ SocketChannel channel;
+ while ((channel = server.accept()) != null) {
channel.configureBlocking(false);
channel.socket().setTcpNoDelay(tcpNoDelay);
- SelectionKey readKey = channel.register(selector, SelectionKey.OP_READ);
- c = new Connection(readKey, channel, System.currentTimeMillis());
- readKey.attach(c);
- synchronized (connectionList) {
- connectionList.add(numConnections, c);
- numConnections++;
+ Reader reader = getReader();
+ try {
+ reader.startAdd();
+ SelectionKey readKey = reader.registerChannel(channel);
+ c = new Connection(readKey, channel, System.currentTimeMillis());
+ readKey.attach(c);
+ synchronized (connectionList) {
+ connectionList.add(numConnections, c);
+ numConnections++;
+ }
+ if (LOG.isDebugEnabled())
+ LOG.debug("Server connection from " + c.toString() +
+ "; # active connections: " + numConnections +
+ "; # queued calls: " + callQueue.size());
+ } finally {
+ reader.finishAdd();
}
- if (LOG.isDebugEnabled())
- LOG.debug("Server connection from " + c.toString() +
- "; # active connections: " + numConnections +
- "; # queued calls: " + callQueue.size());
+
}
}
@@ -492,7 +568,16 @@ public abstract class Server {
LOG.info(getName() + ":Exception in closing listener socket. " + e);
}
}
+ readPool.shutdown();
+ }
+
+ // The method that will return the next reader to work with
+ // Simplistic implementation of round robin for now
+ Reader getReader() {
+ currentReader = (currentReader + 1) % readers.length;
+ return readers[currentReader];
}
+
}
// Sends responses of RPC back to clients.
@@ -1380,6 +1465,9 @@ public abstract class Server {
IPC_SERVER_HANDLER_QUEUE_SIZE_DEFAULT);
this.maxRespSize = conf.getInt(IPC_SERVER_RPC_MAX_RESPONSE_SIZE_KEY,
IPC_SERVER_RPC_MAX_RESPONSE_SIZE_DEFAULT);
+ this.readThreads = conf.getInt(
+ IPC_SERVER_RPC_READ_THREADS_KEY,
+ IPC_SERVER_RPC_READ_THREADS_DEFAULT);
this.callQueue = new LinkedBlockingQueue<Call>(maxQueueSize);
this.maxIdleTime = 2*conf.getInt("ipc.client.connection.maxidletime", 1000);
this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10);
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/ipc/TestRPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/ipc/TestRPC.java?rev=1077528&r1=1077527&r2=1077528&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/ipc/TestRPC.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/ipc/TestRPC.java Fri Mar 4 04:24:40 2011
@@ -233,8 +233,7 @@ public class TestRPC extends TestCase {
}
}
-
- public void testCalls() throws Exception {
+ public void testCalls(Configuration conf) throws Exception {
Server server = RPC.getServer(new TestImpl(), ADDRESS, 0, conf);
TestProtocol proxy = null;
try {
@@ -420,8 +419,27 @@ public class TestRPC extends TestCase {
// Reset authorization to expect failure
conf.set(ACL_CONFIG, "invalid invalid");
doRPCs(conf, true);
+
+ conf.setInt(CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY, 2);
+ // Expect to succeed
+ conf.set(ACL_CONFIG, "*");
+ doRPCs(conf, false);
+
+ // Reset authorization to expect failure
+ conf.set(ACL_CONFIG, "invalid invalid");
+ doRPCs(conf, true);
}
-
+
+ public void testNoPings() throws Exception {
+ Configuration conf = new Configuration();
+
+ conf.setBoolean("ipc.client.ping", false);
+ new TestRPC("testnoPings").testCalls(conf);
+
+ conf.setInt(CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY, 2);
+ new TestRPC("testnoPings").testCalls(conf);
+ }
+
public void testErrorMsgForInsecureClient() throws Exception {
final Server server = RPC.getServer(
new TestImpl(), ADDRESS, 0, 5, true, conf, null);
@@ -444,11 +462,36 @@ public class TestRPC extends TestCase {
}
}
assertTrue(succeeded);
+
+ conf.setInt(CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY, 2);
+
+ final Server multiServer = RPC.getServer(new TestImpl(),
+ ADDRESS, 0, 5, true, conf, null);
+ multiServer.enableSecurity();
+ multiServer.start();
+ succeeded = false;
+ final InetSocketAddress mulitServerAddr =
+ NetUtils.getConnectAddress(multiServer);
+ proxy = null;
+ try {
+ proxy = (TestProtocol) RPC.getProxy(TestProtocol.class,
+ TestProtocol.versionID, mulitServerAddr, conf);
+ } catch (RemoteException e) {
+ LOG.info("LOGGING MESSAGE: " + e.getLocalizedMessage());
+ assertTrue(e.unwrapRemoteException() instanceof AccessControlException);
+ succeeded = true;
+ } finally {
+ multiServer.stop();
+ if (proxy != null) {
+ RPC.stopProxy(proxy);
+ }
+ }
+ assertTrue(succeeded);
}
-
+
public static void main(String[] args) throws Exception {
- new TestRPC("test").testCalls();
+ new TestRPC("test").testCalls(conf);
}
}