You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@thrift.apache.org by kc...@apache.org on 2008/06/18 03:17:45 UTC

svn commit: r669012 - in /incubator/thrift/trunk/lib/rb: lib/thrift/server/nonblockingserver.rb spec/nonblockingserver_spec.rb

Author: kclark
Date: Tue Jun 17 18:17:44 2008
New Revision: 669012

URL: http://svn.apache.org/viewvc?rev=669012&view=rev
Log:
rb: Completely rewrite Thrift::NonblockingServer

It now has a much better and cleaner architecture, a proper persistent thread pool,
a dedicated acceptor thread, and no concurrency issues

Modified:
    incubator/thrift/trunk/lib/rb/lib/thrift/server/nonblockingserver.rb
    incubator/thrift/trunk/lib/rb/spec/nonblockingserver_spec.rb

Modified: incubator/thrift/trunk/lib/rb/lib/thrift/server/nonblockingserver.rb
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/rb/lib/thrift/server/nonblockingserver.rb?rev=669012&r1=669011&r2=669012&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/rb/lib/thrift/server/nonblockingserver.rb (original)
+++ incubator/thrift/trunk/lib/rb/lib/thrift/server/nonblockingserver.rb Tue Jun 17 18:17:44 2008
@@ -1,161 +1,269 @@
 require 'thrift/server'
-require 'sync'
-# thrift/server already imports fastthread/thread
+require 'logger'
+require 'thread'
 
 module Thrift
   # this class expects to always use a FramedTransport for reading messages
-  #--
-  # this isn't very pretty, but we're working around the fact that FramedTransport
-  # and the processors are all written in a synchronous manner.
-  # So lets read data off the wire ourselves, check if we have a full frame, and
-  # only then hand it to the transport to parse
-  #
-  # we inherit from ThreadPoolServer for the initialize/rescuable_serve methods
-  class NonblockingServer < ThreadPoolServer
-    def initialize(processor, serverTransport, transportFactory=nil, protocolFactory=nil, num=20)
-      super
-      @sync = Sync.new
+  class NonblockingServer < Server
+    def initialize(processor, serverTransport, transportFactory=nil, protocolFactory=nil, num=20, logger = nil)
+      super(processor, serverTransport, transportFactory, protocolFactory)
+      @num_threads = num
+      if logger.nil?
+        @logger = Logger.new(STDERR)
+        @logger.level = Logger::WARN
+      else
+        @logger = logger
+      end
+      @shutdown_semaphore = Mutex.new
     end
 
     def serve
-      @server_thread = Thread.current
+      @logger.info "Starting #{self}"
       @serverTransport.listen
+      @io_manager = start_io_manager
 
       begin
-        connections = {}
-        running_connections = {}
-        # the swapping_connections stuff is to ensure the thread doesn't
-        # put the connection back into the regular list, then have the server
-        # thread process it again, then have the first thread remove it from
-        # the running_connections list
-        swapping_connections = {}
-        thread_group = ThreadGroup.new
         loop do
-          break if @shutdown
-          handles = [@serverTransport.handle]
-          @sync.synchronize(Sync::SH) do
-            handles.concat connections.keys
-          end
-          rd, = select(handles)
-          next if rd.nil?
-          rd.each do |socket|
-            if socket == @serverTransport.handle
-              client = @serverTransport.accept
-              buffer = ''
-              outtrans = @transportFactory.get_transport(client)
-              outprot = @protocolFactory.get_protocol(outtrans)
-              @sync.synchronize(Sync::EX) do
-                connections[client.handle] = [client, buffer, outtrans, outprot]
-              end
+          socket = @serverTransport.accept
+          @logger.debug "Accepted socket: #{socket.inspect}"
+          @io_manager.add_connection socket
+        end
+      rescue IOError => e
+        # we must be shutting down
+        @logger.info "#{self} is shutting down, goodbye"
+      end
+    ensure
+      @serverTransport.close
+      @io_manager.ensure_closed unless @io_manager.nil?
+    end
+
+    def shutdown(timeout = 0, block = true)
+      @shutdown_semaphore.synchronize do
+        return if @is_shutdown
+        @is_shutdown = true
+      end
+      # nonblocking is intended for calling from within a Handler
+      # but we can't change the order of operations here, so lets thread
+      shutdown_proc = lambda do
+        @io_manager.shutdown(timeout)
+        @serverTransport.close # this will break the accept loop
+      end
+      if block
+        shutdown_proc.call
+      else
+        Thread.new &shutdown_proc
+      end
+    end
+
+    private
+
+    def start_io_manager
+      iom = IOManager.new(@processor, @serverTransport, @transportFactory, @protocolFactory, @num_threads, @logger)
+      iom.spawn
+      iom
+    end
+
+    class IOManager # :nodoc:
+      def initialize(processor, serverTransport, transportFactory, protocolFactory, num, logger)
+        @processor = processor
+        @serverTransport = serverTransport
+        @transportFactory = transportFactory
+        @protocolFactory = protocolFactory
+        @num_threads = num
+        @logger = logger
+        @connections = []
+        @buffers = Hash.new { |h,k| h[k] = '' }
+        @signal_queue = Queue.new
+        @signal_pipes = IO.pipe
+        @signal_pipes[1].sync = true
+        @worker_queue = Queue.new
+        @shutdown_queue = Queue.new
+      end
+
+      def add_connection(socket)
+        signal [:connection, socket]
+      end
+
+      def spawn
+        @iom_thread = Thread.new do
+          @logger.debug "Starting #{self}"
+          run
+        end
+      end
+
+      def shutdown(timeout = 0)
+        @logger.debug "#{self} is shutting down workers"
+        @worker_queue.clear
+        @num_threads.times { @worker_queue.push [:shutdown] }
+        signal [:shutdown, timeout]
+        @shutdown_queue.pop
+        @signal_pipes[0].close
+        @signal_pipes[1].close
+        @logger.debug "#{self} is shutting down, goodbye"
+      end
+
+      def ensure_closed
+        kill_worker_threads if @worker_threads
+        @iom_thread.kill
+      end
+
+      private
+
+      def run
+        spin_worker_threads
+
+        loop do
+          rd, = select([@signal_pipes[0], *@connections])
+          if rd.delete @signal_pipes[0]
+            break if read_signals == :shutdown
+          end
+          rd.each do |fd|
+            if fd.handle.eof?
+              remove_connection fd
             else
-              client, buffer, outtrans, outprot = nil # for scope
-              @sync.synchronize(Sync::SH) do
-                client, buffer, outtrans, outprot = connections[socket]
-              end
-              if socket.eof?
-                client.close
-                @sync.synchronize(Sync::EX) do
-                  connections.delete(socket)
-                end
-              else
-                buffer << client.read(4096, true)
-                if has_full_frame?(buffer)
-                  @sync.synchronize(Sync::EX) do
-                    running_connections[socket] = connections.delete(socket)
-                  end
-                  @thread_q.push :token
-                  t = Thread.new(Thread.current) do |master|
-                    begin
-                      membuf = MemoryBuffer.new(buffer)
-                      intrans = @transportFactory.get_transport(membuf)
-                      inprot = @protocolFactory.get_protocol(intrans)
-                      @processor.process(inprot, outprot)
-                      if @shutdown
-                        client.close
-                        @sync.synchronize(Sync::EX) do
-                          running_connections.delete(socket)
-                        end
-                      else
-                        @sync.synchronize(Sync::EX) do
-                          swapping_connections[socket] = running_connections.delete(socket)
-                        end
-                      end
-                    rescue => e
-                      outtrans.close
-                      @exception_q.push e
-                    ensure
-                      should_wakeup = false
-                      @sync.synchronize(Sync::EX) do
-                        running_connections.delete(socket)
-                        if swapping_connections.include? socket
-                          connections[socket] = swapping_connections.delete(socket)
-                          should_wakeup = true
-                        end
-                      end
-                      master.wakeup if should_wakeup
-                      intrans.close
-                      @thread_q.pop
-                    end
-                  end
-                  thread_group.add t
-                end
-              end
+              read_connection fd
             end
           end
         end
-        if @shutdown
-          @serverTransport.close
-          handles = []
-          @sync.synchronize(Sync::SH) do
-            handles = connections
-            handles.merge! running_connections
-            handles.merge! swapping_connections
-          end
-          handles.values.each do |client, buffer, outtrans, outprot|
-            # can't close completely or we'll break active messages
-            # but lets at least stop accepting input
-            client.handle.close_read
-          end
-          start = Time.now.to_f
-          until thread_group.list.empty?
-            if @shutdown_timeout
-              now = Time.now.to_f
-              cur_timeout = @shutdown_timeout - (now - start)
-              break if cur_timeout <= 0
-              thread_group.list.first.join(cur_timeout)
-            else
-              thread_group.list.first.join
+        join_worker_threads(@shutdown_timeout)
+      ensure
+        @shutdown_queue.push :shutdown
+      end
+
+      def read_connection(fd)
+        buffer = ''
+        begin
+          buffer << fd.read_nonblock(4096) while true
+        rescue Errno::EAGAIN, EOFError
+          @buffers[fd] << buffer
+        end
+        frame = slice_frame!(@buffers[fd])
+        if frame
+          @worker_queue.push [:frame, fd, frame]
+        end
+      end
+
+      def spin_worker_threads
+        @logger.debug "#{self} is spinning up worker threads"
+        @worker_threads = []
+        @num_threads.times do
+          @worker_threads << spin_thread
+        end
+      end
+
+      def spin_thread
+        Worker.new(@processor, @transportFactory, @protocolFactory, @logger, @worker_queue).spawn
+      end
+
+      def signal(msg)
+        @signal_queue << msg
+        @signal_pipes[1].write " "
+      end
+
+      def read_signals
+        # clear the signal pipe
+        begin
+          @signal_pipes[0].read_nonblock(1024) while true
+        rescue Errno::EAGAIN
+        end
+        # now read the signals
+        begin
+          loop do
+            signal, obj = @signal_queue.pop(true)
+            case signal
+            when :connection
+              @connections << obj
+            when :shutdown
+              @shutdown_timeout = obj
+              return :shutdown
             end
           end
-          thread_group.list.each { |t| t.kill } if @shutdown_kill
-          # now kill connections completely if they still exists
-          handles.values.each do |client, buffer, outtrans, outprot|
-            client.close
+        rescue ThreadError
+          # out of signals
+        end
+      end
+
+      def remove_connection(fd)
+        # don't explicitly close it, a thread may still be writing to it
+        @connections.delete fd
+        @buffers.delete fd
+      end
+
+      def join_worker_threads(shutdown_timeout)
+        start = Time.now
+        @worker_threads.each do |t|
+          if shutdown_timeout > 0
+            timeout = Time.now - (start + shutdown_timeout)
+            break if timeout <= 0
+            t.join(timeout)
+          else
+            t.join
           end
         end
-      ensure
-        @serverTransport.close
+        kill_worker_threads
       end
-    end
 
-    # Stop accepting new messages and wait for active messages to finish
-    # If the given timeout passes without the active messages finishing,
-    # control will exit from #serve and leave the remaining threads active.
-    # If you pass true for kill, the remaining threads will be reaped instead.
-    # A false timeout means wait indefinitely
-    def shutdown(timeout = nil, kill = false)
-      @shutdown_timeout = timeout
-      @shutdown_kill = kill
-      @shutdown = true
-      @server_thread.wakeup
-    end
+      def kill_worker_threads
+        @worker_threads.each do |t|
+          t.kill if t.status
+        end
+        @worker_threads.clear
+      end
 
-    private
+      def slice_frame!(buf)
+        if buf.length >= 4
+          size = buf.unpack('N').first
+          if buf.length >= size + 4
+            buf.slice!(0, size + 4)
+          else
+            nil
+          end
+        else
+          nil
+        end
+      end
+
+      class Worker # :nodoc:
+        def initialize(processor, transportFactory, protocolFactory, logger, queue)
+          @processor = processor
+          @transportFactory = transportFactory
+          @protocolFactory = protocolFactory
+          @logger = logger
+          @queue = queue
+        end
 
-    def has_full_frame?(buf)
-      return no unless buf.length >= 4
-      size = buf.unpack('N').first
-      size + 4 <= buf.length
+        def spawn
+          Thread.new do
+            @logger.debug "#{self} is spawning"
+            run
+          end
+        end
+
+        private
+
+        def run
+          loop do
+            cmd, *args = @queue.pop
+            case cmd
+            when :shutdown
+              @logger.debug "#{self} is shutting down, goodbye"
+              break
+            when :frame
+              fd, frame = args
+              begin
+                otrans = @transportFactory.get_transport(fd)
+                oprot = @protocolFactory.get_protocol(otrans)
+                membuf = MemoryBuffer.new(frame)
+                itrans = @transportFactory.get_transport(membuf)
+                iprot = @protocolFactory.get_protocol(itrans)
+                @processor.process(iprot, oprot)
+              rescue => e
+                @logger.error "#{Thread.current.inspect} raised error: #{e.inspect}\n#{e.backtrace.join("\n")}"
+              end
+            end
+          end
+        end
+      end
     end
   end
-end
+end
\ No newline at end of file

Modified: incubator/thrift/trunk/lib/rb/spec/nonblockingserver_spec.rb
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/rb/spec/nonblockingserver_spec.rb?rev=669012&r1=669011&r2=669012&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/rb/spec/nonblockingserver_spec.rb (original)
+++ incubator/thrift/trunk/lib/rb/spec/nonblockingserver_spec.rb Tue Jun 17 18:17:44 2008
@@ -35,139 +35,189 @@
     end
 
     def shutdown
-      @server.shutdown
+      @server.shutdown(0, false)
     end
   end
 
-  before(:each) do
-    @port = 43251
-    handler = Handler.new
-    processor = NonblockingService::Processor.new(handler)
-    @transport = ServerSocket.new('localhost', @port)
-    transportFactory = FramedTransportFactory.new
-    @server = NonblockingServer.new(processor, @transport, transportFactory, nil, 5)
-    handler.server = @server
-    @server_thread = Thread.new do
-      begin
-        @server.serve
-      rescue => e
-        p e
-        puts e.backtrace * "\n"
-        raise e
-      end
+  class SpecTransport < Transport
+    def initialize(transport, queue)
+      @transport = transport
+      @queue = queue
+      @flushed = false
     end
-    Thread.pass
 
-    @clients = []
-  end
+    def open?
+      @transport.open?
+    end
 
-  after(:each) do
-    @clients.each { |client, trans| trans.close }
-    @server_thread.kill
-    @transport.close
-  end
+    def open
+      @transport.open
+    end
+
+    def close
+      @transport.close
+    end
+
+    def read(sz)
+      @transport.read(sz)
+    end
+
+    def write(buf,sz=nil)
+      @transport.write(buf, sz)
+    end
 
-  def setup_client
-    transport = FramedTransport.new(Socket.new('localhost', @port))
-    protocol = BinaryProtocol.new(transport)
-    client = NonblockingService::Client.new(protocol)
-    transport.open
-    @clients << [client, transport]
-    client
+    def flush
+      @queue.push :flushed unless @flushed or @queue.nil?
+      @flushed = true
+      @transport.flush
+    end
   end
 
-  def setup_client_thread(result)
-    queue = Queue.new
-    Thread.new do
-      client = setup_client
-      while (msg = queue.pop)
-        case msg
-        when :block
-          result << client.block
-        when :unblock
-          client.unblock
-        when :hello
-          result << client.greeting(true) # ignore result
-        when :sleep
-          client.sleep(0.5)
-          result << :slept
-        when :shutdown
-          client.shutdown
-        when :exit
-          result << :done
-          break
+  describe Thrift::NonblockingServer do
+    before(:each) do
+      @port = 43251
+      handler = Handler.new
+      processor = NonblockingService::Processor.new(handler)
+      @transport = ServerSocket.new('localhost', @port)
+      transportFactory = FramedTransportFactory.new
+      logger = Logger.new(STDERR)
+      logger.level = Logger::WARN
+      @server = NonblockingServer.new(processor, @transport, transportFactory, nil, 5, logger)
+      handler.server = @server
+      @server_thread = Thread.new(Thread.current) do |master_thread|
+        begin
+          @server.serve
+        rescue => e
+          p e
+          puts e.backtrace * "\n"
+          master_thread.raise e
         end
       end
-      @clients.each { |c,t| t.close and break if c == client } #close the transport
+      Thread.pass
+
+      @clients = []
+      @catch_exceptions = false
     end
-    queue
-  end
 
-  it "should handle basic message passing" do
-    client = setup_client
-    client.greeting(true).should == Hello.new
-    client.greeting(false).should == Hello.new(:greeting => 'Aloha!')
-  end
+    after(:each) do
+      @clients.each { |client, trans| trans.close }
+      # @server.shutdown(1)
+      @server_thread.kill
+      @transport.close
+    end
 
-  it "should handle concurrent clients" do
-    queue = Queue.new
-    4.times { Thread.new { queue.push setup_client.block } }
-    setup_client.unblock
-    4.times { queue.pop.should be_true }
-  end
+    def setup_client(queue = nil)
+      transport = SpecTransport.new(FramedTransport.new(Socket.new('localhost', @port)), queue)
+      protocol = BinaryProtocol.new(transport)
+      client = NonblockingService::Client.new(protocol)
+      transport.open
+      @clients << [client, transport]
+      client
+    end
 
-  it "should handle messages from more than 5 long-lived connections" do
-    queues = []
-    result = Queue.new
-    7.times do |i|
-      queues << setup_client_thread(result)
-      Thread.pass if i == 4 # give the server time to accept connections
-    end
-    client = setup_client
-    # block 4 connections
-    4.times { |i| queues[i] << :block }
-    queues[4] << :hello
-    queues[5] << :hello
-    queues[6] << :hello
-    3.times { result.pop.should == Hello.new }
-    client.greeting(true).should == Hello.new
-    queues[5] << :unblock
-    4.times { result.pop.should be_true }
-    queues[2] << :hello
-    result.pop.should == Hello.new
-    client.greeting(false).should == Hello.new(:greeting => 'Aloha!')
-    7.times { queues.shift << :exit }
-    client.greeting(true).should == Hello.new
-  end
+    def setup_client_thread(result)
+      queue = Queue.new
+      Thread.new do
+        begin
+          client = setup_client
+          while (msg = queue.pop)
+            case msg
+            when :block
+              result << client.block
+            when :unblock
+              client.unblock
+            when :hello
+              result << client.greeting(true) # ignore result
+            when :sleep
+              client.sleep(0.5)
+              result << :slept
+            when :shutdown
+              client.shutdown
+            when :exit
+              result << :done
+              break
+            end
+          end
+          @clients.each { |c,t| t.close and break if c == client } #close the transport
+        rescue => e
+          raise e unless @catch_exceptions
+        end
+      end
+      queue
+    end
 
-  it "should shut down when asked" do
-    @server.shutdown
-    @server_thread.join(2).should be_an_instance_of(Thread)
-  end
+    it "should handle basic message passing" do
+      client = setup_client
+      client.greeting(true).should == Hello.new
+      client.greeting(false).should == Hello.new(:greeting => 'Aloha!')
+    end
 
-  it "should continue processing active messages when shutting down" do
-    result = Queue.new
-    client = setup_client_thread(result)
-    client << :sleep
-    sleep 0.1 # give the server time to start processing the client's message
-    @server.shutdown
-    @server_thread.join(2).should be_an_instance_of(Thread)
-    result.pop.should == :slept
-  end
+    it "should handle concurrent clients" do
+      queue = Queue.new
+      trans_queue = Queue.new
+      4.times { Thread.new { queue.push setup_client(trans_queue).block } }
+      4.times { trans_queue.pop }
+      setup_client.unblock
+      4.times { queue.pop.should be_true }
+    end
 
-  it "should kill active messages when they don't expire while shutting down" do
-    result = Queue.new
-    client = setup_client_thread(result)
-    client << :block
-    sleep 0.1 # start processing the client's message
-    @server.shutdown(1, true)
-    @server_thread.join(3).should_not be_nil
-  end
+    it "should handle messages from more than 5 long-lived connections" do
+      queues = []
+      result = Queue.new
+      7.times do |i|
+        queues << setup_client_thread(result)
+        Thread.pass if i == 4 # give the server time to accept connections
+      end
+      client = setup_client
+      # block 4 connections
+      4.times { |i| queues[i] << :block }
+      queues[4] << :hello
+      queues[5] << :hello
+      queues[6] << :hello
+      3.times { result.pop.should == Hello.new }
+      client.greeting(true).should == Hello.new
+      queues[5] << :unblock
+      4.times { result.pop.should be_true }
+      queues[2] << :hello
+      result.pop.should == Hello.new
+      client.greeting(false).should == Hello.new(:greeting => 'Aloha!')
+      7.times { queues.shift << :exit }
+      client.greeting(true).should == Hello.new
+    end
 
-  it "should allow shutting down in response to a message" do
-    client = setup_client
-    client.greeting(true).should == Hello.new
-    client.shutdown
-    @server_thread.join(2).should_not be_nil
+    it "should shut down when asked" do
+      # connect first to ensure it's running
+      client = setup_client
+      client.greeting(false) # force a message pass
+      @server.shutdown
+      @server_thread.join(2).should be_an_instance_of(Thread)
+    end
+
+    it "should continue processing active messages when shutting down" do
+      result = Queue.new
+      client = setup_client_thread(result)
+      client << :sleep
+      sleep 0.1 # give the server time to start processing the client's message
+      @server.shutdown
+      @server_thread.join(2).should be_an_instance_of(Thread)
+      result.pop.should == :slept
+    end
+
+    it "should kill active messages when they don't expire while shutting down" do
+      result = Queue.new
+      client = setup_client_thread(result)
+      client << :block
+      sleep 0.1 # start processing the client's message
+      @server.shutdown(1)
+      @catch_exceptions = true
+      @server_thread.join(3).should_not be_nil
+    end
+
+    it "should allow shutting down in response to a message" do
+      client = setup_client
+      client.greeting(true).should == Hello.new
+      client.shutdown
+      @server_thread.join(2).should_not be_nil
+    end
   end
 end