You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 05:24:40 UTC

svn commit: r1077528 - in /hadoop/common/branches/branch-0.20-security-patches/src: core/org/apache/hadoop/fs/CommonConfigurationKeys.java core/org/apache/hadoop/ipc/Server.java test/org/apache/hadoop/ipc/TestRPC.java

Author: omalley
Date: Fri Mar  4 04:24:40 2011
New Revision: 1077528

URL: http://svn.apache.org/viewvc?rev=1077528&view=rev
Log:
commit 048d12257c35102e92c89164906fe6af21161c0c
Author: Bharath Mundlapudi <bh...@yahoo-inc.com>
Date:   Fri Jul 2 17:37:37 2010 -0700

    HADOOP:6713 from https://issues.apache.org/jira/secure/attachment/12448613/HADOOP-6713-rel20.3.patch
    
    +++ b/YAHOO-CHANGES.txt
    +    HADOOP-6713. Multiple RPC Reader Threads (Bharathm)
    +

Modified:
    hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/fs/CommonConfigurationKeys.java
    hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/Server.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/ipc/TestRPC.java

Modified: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/fs/CommonConfigurationKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/fs/CommonConfigurationKeys.java?rev=1077528&r1=1077527&r2=1077528&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/fs/CommonConfigurationKeys.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/fs/CommonConfigurationKeys.java Fri Mar  4 04:24:40 2011
@@ -42,5 +42,9 @@ public class CommonConfigurationKeys {
   /** See src/core/core-default.xml */
   public static final String  HADOOP_SECURITY_SERVICE_USER_NAME_KEY = 
     "hadoop.security.service.user.name.key";
+  public static final String IPC_SERVER_RPC_READ_THREADS_KEY =
+                                        "ipc.server.read.threadpool.size";
+  public static final int IPC_SERVER_RPC_READ_THREADS_DEFAULT = 1;
+
 }
 

Modified: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/Server.java?rev=1077528&r1=1077527&r2=1077528&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/Server.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/ipc/Server.java Fri Mar  4 04:24:40 2011
@@ -51,6 +51,8 @@ import java.util.Random;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 import javax.security.sasl.Sasl;
 import javax.security.sasl.SaslException;
@@ -173,6 +175,7 @@ public abstract class Server {
   private String bindAddress; 
   private int port;                               // port we listen on
   private int handlerCount;                       // number of handler threads
+  private int readThreads;                        // number of read threads
   private Class<? extends Writable> paramClass;   // class of call parameters
   private int maxIdleTime;                        // the maximum idle time after 
                                                   // which a client may be disconnected
@@ -278,6 +281,8 @@ public abstract class Server {
     
     private ServerSocketChannel acceptChannel = null; //the accept channel
     private Selector selector = null; //the selector that we use for the server
+    private Reader[] readers = null;
+    private int currentReader = 0;
     private InetSocketAddress address; //the address we bind at
     private Random rand = new Random();
     private long lastCleanupRunTime = 0; //the last time when a cleanup connec-
@@ -285,7 +290,8 @@ public abstract class Server {
     private long cleanupInterval = 10000; //the minimum interval between 
                                           //two cleanup runs
     private int backlogLength = conf.getInt("ipc.server.listen.queue.size", 128);
-    
+    private ExecutorService readPool; 
+   
     public Listener() throws IOException {
       address = new InetSocketAddress(bindAddress, port);
       // Create a new server socket and set to non blocking mode
@@ -297,12 +303,85 @@ public abstract class Server {
       port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
       // create a selector;
       selector= Selector.open();
+      readers = new Reader[readThreads];
+      readPool = Executors.newFixedThreadPool(readThreads);
+      for (int i = 0; i < readThreads; i++) {
+        Selector readSelector = Selector.open();
+        Reader reader = new Reader(readSelector);
+        readers[i] = reader;
+        readPool.execute(reader);
+      }
 
       // Register accepts on the server socket with the selector.
       acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
       this.setName("IPC Server listener on " + port);
       this.setDaemon(true);
     }
+    
+    private class Reader implements Runnable {
+      private volatile boolean adding = false;
+      private Selector readSelector = null;
+
+      Reader(Selector readSelector) {
+        this.readSelector = readSelector;
+      }
+      public void run() {
+        LOG.info("Starting SocketReader");
+        synchronized (this) {
+          while (running) {
+            SelectionKey key = null;
+            try {
+              readSelector.select();
+              while (adding) {
+                this.wait(1000);
+              }              
+
+              Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
+              while (iter.hasNext()) {
+                key = iter.next();
+                iter.remove();
+                if (key.isValid()) {
+                  if (key.isReadable()) {
+                    doRead(key);
+                  }
+                }
+                key = null;
+              }
+            } catch (InterruptedException e) {
+              if (running) {                      // unexpected -- log it
+                LOG.info(getName() + " caught: " +
+                         StringUtils.stringifyException(e));
+              }
+            } catch (IOException ex) {
+              LOG.error("Error in Reader", ex);
+            }
+          }
+        }
+      }
+
+      /**
+       * This gets reader into the state that waits for the new channel
+       * to be registered with readSelector. If it was waiting in select()
+       * the thread will be woken up, otherwise whenever select() is called
+       * it will return even if there is nothing to read and wait
+       * in while(adding) for finishAdd call
+       */
+      public void startAdd() {
+        adding = true;
+        readSelector.wakeup();
+      }
+      
+      public synchronized SelectionKey registerChannel(SocketChannel channel)
+                                                          throws IOException {
+          return channel.register(readSelector, SelectionKey.OP_READ);
+      }
+
+      public synchronized void finishAdd() {
+        adding = false;
+        this.notify();        
+      }
+    }
+
     /** cleanup connections from connectionList. Choose a random range
      * to scan and also have a limit on the number of the connections
      * that will be cleanedup per run. The criteria for cleanup is the time
@@ -367,8 +446,6 @@ public abstract class Server {
               if (key.isValid()) {
                 if (key.isAcceptable())
                   doAccept(key);
-                else if (key.isReadable())
-                  doRead(key);
               }
             } catch (IOException e) {
             }
@@ -382,11 +459,6 @@ public abstract class Server {
           closeCurrentConnection(key, e);
           cleanupConnections(true);
           try { Thread.sleep(60000); } catch (Exception ie) {}
-        } catch (InterruptedException e) {
-          if (running) {                          // unexpected -- log it
-            LOG.info(getName() + " caught: " +
-                     StringUtils.stringifyException(e));
-          }
         } catch (Exception e) {
           closeCurrentConnection(key, e);
         }
@@ -429,24 +501,28 @@ public abstract class Server {
     void doAccept(SelectionKey key) throws IOException,  OutOfMemoryError {
       Connection c = null;
       ServerSocketChannel server = (ServerSocketChannel) key.channel();
-      // accept up to 10 connections
-      for (int i=0; i<10; i++) {
-        SocketChannel channel = server.accept();
-        if (channel==null) return;
-
+      SocketChannel channel;
+      while ((channel = server.accept()) != null) {
         channel.configureBlocking(false);
         channel.socket().setTcpNoDelay(tcpNoDelay);
-        SelectionKey readKey = channel.register(selector, SelectionKey.OP_READ);
-        c = new Connection(readKey, channel, System.currentTimeMillis());
-        readKey.attach(c);
-        synchronized (connectionList) {
-          connectionList.add(numConnections, c);
-          numConnections++;
+        Reader reader = getReader();
+        try {
+          reader.startAdd();
+          SelectionKey readKey = reader.registerChannel(channel);
+          c = new Connection(readKey, channel, System.currentTimeMillis());
+          readKey.attach(c);
+          synchronized (connectionList) {
+            connectionList.add(numConnections, c);
+            numConnections++;
+          }
+          if (LOG.isDebugEnabled())
+            LOG.debug("Server connection from " + c.toString() +
+                "; # active connections: " + numConnections +
+                "; # queued calls: " + callQueue.size());          
+        } finally {
+          reader.finishAdd(); 
         }
-        if (LOG.isDebugEnabled())
-          LOG.debug("Server connection from " + c.toString() +
-              "; # active connections: " + numConnections +
-              "; # queued calls: " + callQueue.size());
+
       }
     }
 
@@ -492,7 +568,16 @@ public abstract class Server {
           LOG.info(getName() + ":Exception in closing listener socket. " + e);
         }
       }
+      readPool.shutdown();
+    }
+
+    // The method that will return the next reader to work with
+    // Simplistic implementation of round robin for now
+    Reader getReader() {
+      currentReader = (currentReader + 1) % readers.length;
+      return readers[currentReader];
     }
+
   }
 
   // Sends responses of RPC back to clients.
@@ -1380,6 +1465,9 @@ public abstract class Server {
                                 IPC_SERVER_HANDLER_QUEUE_SIZE_DEFAULT);
     this.maxRespSize = conf.getInt(IPC_SERVER_RPC_MAX_RESPONSE_SIZE_KEY,
                                    IPC_SERVER_RPC_MAX_RESPONSE_SIZE_DEFAULT);
+    this.readThreads = conf.getInt(
+        IPC_SERVER_RPC_READ_THREADS_KEY,
+        IPC_SERVER_RPC_READ_THREADS_DEFAULT);
     this.callQueue  = new LinkedBlockingQueue<Call>(maxQueueSize); 
     this.maxIdleTime = 2*conf.getInt("ipc.client.connection.maxidletime", 1000);
     this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10);

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/ipc/TestRPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/ipc/TestRPC.java?rev=1077528&r1=1077527&r2=1077528&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/ipc/TestRPC.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/ipc/TestRPC.java Fri Mar  4 04:24:40 2011
@@ -233,8 +233,7 @@ public class TestRPC extends TestCase {
     }
   }
 
-
-  public void testCalls() throws Exception {
+  public void testCalls(Configuration conf) throws Exception {
     Server server = RPC.getServer(new TestImpl(), ADDRESS, 0, conf);
     TestProtocol proxy = null;
     try {
@@ -420,8 +419,27 @@ public class TestRPC extends TestCase {
     // Reset authorization to expect failure
     conf.set(ACL_CONFIG, "invalid invalid");
     doRPCs(conf, true);
+
+    conf.setInt(CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY, 2);
+    // Expect to succeed
+    conf.set(ACL_CONFIG, "*");
+    doRPCs(conf, false);
+    
+    // Reset authorization to expect failure
+    conf.set(ACL_CONFIG, "invalid invalid");
+    doRPCs(conf, true);
   }
-  
+ 
+   public void testNoPings() throws Exception {
+     Configuration conf = new Configuration();
+    
+     conf.setBoolean("ipc.client.ping", false);
+     new TestRPC("testnoPings").testCalls(conf);
+    
+     conf.setInt(CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY, 2);
+     new TestRPC("testnoPings").testCalls(conf);
+   }
+ 
   public void testErrorMsgForInsecureClient() throws Exception {
     final Server server = RPC.getServer(
         new TestImpl(), ADDRESS, 0, 5, true, conf, null);
@@ -444,11 +462,36 @@ public class TestRPC extends TestCase {
       }
     }
     assertTrue(succeeded);
+
+    conf.setInt(CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY, 2);
+
+    final Server multiServer = RPC.getServer(new TestImpl(), 
+						ADDRESS, 0, 5, true, conf, null);
+    multiServer.enableSecurity();
+    multiServer.start();
+    succeeded = false;
+    final InetSocketAddress mulitServerAddr =
+                      NetUtils.getConnectAddress(multiServer);
+    proxy = null;
+    try {
+      proxy = (TestProtocol) RPC.getProxy(TestProtocol.class,
+          TestProtocol.versionID, mulitServerAddr, conf);
+    } catch (RemoteException e) {
+      LOG.info("LOGGING MESSAGE: " + e.getLocalizedMessage());
+      assertTrue(e.unwrapRemoteException() instanceof AccessControlException);
+      succeeded = true;
+    } finally {
+      multiServer.stop();
+      if (proxy != null) {
+        RPC.stopProxy(proxy);
+      }
+    }
+    assertTrue(succeeded);
   }
-  
+ 
   public static void main(String[] args) throws Exception {
 
-    new TestRPC("test").testCalls();
+    new TestRPC("test").testCalls(conf);
 
   }
 }