You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@thrift.apache.org by kc...@apache.org on 2008/06/18 03:16:58 UTC

svn commit: r669006 - /incubator/thrift/trunk/lib/rb/lib/thrift/server/nonblockingserver.rb

Author: kclark
Date: Tue Jun 17 18:16:58 2008
New Revision: 669006

URL: http://svn.apache.org/viewvc?rev=669006&view=rev
Log:
Add synchronization around shared resources in NonblockingServer

Modified:
    incubator/thrift/trunk/lib/rb/lib/thrift/server/nonblockingserver.rb

Modified: incubator/thrift/trunk/lib/rb/lib/thrift/server/nonblockingserver.rb
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/rb/lib/thrift/server/nonblockingserver.rb?rev=669006&r1=669005&r2=669006&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/rb/lib/thrift/server/nonblockingserver.rb (original)
+++ incubator/thrift/trunk/lib/rb/lib/thrift/server/nonblockingserver.rb Tue Jun 17 18:16:58 2008
@@ -1,5 +1,5 @@
 require 'thrift/server'
-
+require 'sync'
 # thrift/server already imports fastthread/thread
 
 module Thrift
@@ -12,6 +12,11 @@
   #
   # we inherit from ThreadPoolServer for the initialize/rescuable_serve methods
   class NonblockingServer < ThreadPoolServer
+    def initialize(processor, serverTransport, transportFactory=nil, protocolFactory=nil, num=20)
+      super
+      @sync = Sync.new
+    end
+
     def serve
       @server_thread = Thread.current
       @serverTransport.listen
@@ -27,7 +32,11 @@
         thread_group = ThreadGroup.new
         loop do
           break if @shutdown
-          rd, = select([@serverTransport.handle, *connections.keys])
+          handles = [@serverTransport.handle]
+          @sync.synchronize(Sync::SH) do
+            handles.concat connections.keys
+          end
+          rd, = select(handles)
           next if rd.nil?
           rd.each do |socket|
             if socket == @serverTransport.handle
@@ -35,16 +44,25 @@
               buffer = ''
               outtrans = @transportFactory.get_transport(client)
               outprot = @protocolFactory.get_protocol(outtrans)
-              connections[client.handle] = [client, buffer, outtrans, outprot]
+              @sync.synchronize(Sync::EX) do
+                connections[client.handle] = [client, buffer, outtrans, outprot]
+              end
             else
-              client, buffer, outtrans, outprot = connections[socket]
+              client, buffer, outtrans, outprot = nil # for scope
+              @sync.synchronize(Sync::SH) do
+                client, buffer, outtrans, outprot = connections[socket]
+              end
               if socket.eof?
                 client.close
-                connections.delete(socket)
+                @sync.synchronize(Sync::EX) do
+                  connections.delete(socket)
+                end
               else
                 buffer << client.read(4096, true)
                 if has_full_frame?(buffer)
-                  running_connections[socket] = connections.delete(socket)
+                  @sync.synchronize(Sync::EX) do
+                    running_connections[socket] = connections.delete(socket)
+                  end
                   @thread_q.push :token
                   t = Thread.new(Thread.current) do |master|
                     begin
@@ -54,17 +72,27 @@
                       @processor.process(inprot, outprot)
                       if @shutdown
                         client.close
-                        running_connections.delete(socket)
+                        @sync.synchronize(Sync::EX) do
+                          running_connections.delete(socket)
+                        end
                       else
-                        swapping_connections[socket] = running_connections.delete(socket)
-                        master.wakeup
+                        @sync.synchronize(Sync::EX) do
+                          swapping_connections[socket] = running_connections.delete(socket)
+                        end
                       end
                     rescue => e
                       outtrans.close
                       @exception_q.push e
                     ensure
-                      running_connections.delete(socket)
-                      connections[socket] = swapping_connections.delete(socket) if swapping_connections.include? socket
+                      should_wakeup = false
+                      @sync.synchronize(Sync::EX) do
+                        running_connections.delete(socket)
+                        if swapping_connections.include? socket
+                          connections[socket] = swapping_connections.delete(socket)
+                          should_wakeup = true
+                        end
+                      end
+                      master.wakeup if should_wakeup
                       intrans.close
                       @thread_q.pop
                     end
@@ -77,9 +105,13 @@
         end
         if @shutdown
           @serverTransport.close
-          connections.merge! running_connections
-          connections.merge! swapping_connections
-          connections.values.each do |client, buffer, outtrans, outprot|
+          handles = []
+          @sync.synchronize(Sync::SH) do
+            handles = connections
+            handles.merge! running_connections
+            handles.merge! swapping_connections
+          end
+          handles.values.each do |client, buffer, outtrans, outprot|
             # can't close completely or we'll break active messages
             # but lets at least stop accepting input
             client.handle.close_read
@@ -97,7 +129,7 @@
           end
           thread_group.list.each { |t| t.kill } if @shutdown_kill
           # now kill connections completely if they still exists
-          connections.values.each do |client, buffer, outtrans, outprot|
+          handles.values.each do |client, buffer, outtrans, outprot|
             client.close
           end
         end