You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@thrift.apache.org by kc...@apache.org on 2008/06/18 03:16:03 UTC

svn commit: r668999 - in /incubator/thrift/trunk/lib/rb: lib/thrift/server/nonblockingserver.rb spec/ThriftSpec.thrift spec/gen-rb/NonblockingService.rb spec/nonblockingserver_spec.rb

Author: kclark
Date: Tue Jun 17 18:16:02 2008
New Revision: 668999

URL: http://svn.apache.org/viewvc?rev=668999&view=rev
Log:
Implement NonblockingServer and add specs

Added:
    incubator/thrift/trunk/lib/rb/lib/thrift/server/nonblockingserver.rb
    incubator/thrift/trunk/lib/rb/spec/gen-rb/NonblockingService.rb
    incubator/thrift/trunk/lib/rb/spec/nonblockingserver_spec.rb
Modified:
    incubator/thrift/trunk/lib/rb/spec/ThriftSpec.thrift

Added: incubator/thrift/trunk/lib/rb/lib/thrift/server/nonblockingserver.rb
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/rb/lib/thrift/server/nonblockingserver.rb?rev=668999&view=auto
==============================================================================
--- incubator/thrift/trunk/lib/rb/lib/thrift/server/nonblockingserver.rb (added)
+++ incubator/thrift/trunk/lib/rb/lib/thrift/server/nonblockingserver.rb Tue Jun 17 18:16:02 2008
@@ -0,0 +1,129 @@
+require 'thrift/server'
+
+# thrift/server already imports fastthread/thread
+
+module Thrift
+  # this class expects to always use a FramedTransport for reading messages
+  #--
+  # this isn't very pretty, but we're working around the fact that FramedTransport
+  # and the processors are all written in a synchronous manner.
+  # So lets read data off the wire ourselves, check if we have a full frame, and
+  # only then hand it to the transport to parse
+  #
+  # we inherit from ThreadPoolServer for the initialize/rescuable_serve methods
+  class NonblockingServer < ThreadPoolServer
+    def serve
+      @server_thread = Thread.current
+      @serverTransport.listen
+
+      begin
+        connections = {}
+        running_connections = {}
+        # the swapping_connections stuff is to ensure the thread doesn't
+        # put the connection back into the regular list, then have the server
+        # thread process it again, then have the first thread remove it from
+        # the running_connections list
+        swapping_connections = {}
+        thread_group = ThreadGroup.new
+        loop do
+          break if @shutdown
+          rd, = select([@serverTransport.handle, *connections.keys])
+          next if rd.nil?
+          rd.each do |socket|
+            if socket == @serverTransport.handle
+              client = @serverTransport.accept
+              buffer = ''
+              outtrans = @transportFactory.get_transport(client)
+              outprot = @protocolFactory.get_protocol(outtrans)
+              connections[client.handle] = [client, buffer, outtrans, outprot]
+            else
+              client, buffer, outtrans, outprot = connections[socket]
+              if socket.eof?
+                client.close
+                connections.delete(socket)
+              else
+                buffer << client.read(4096, true)
+                if has_full_frame?(buffer)
+                  running_connections[socket] = connections.delete(socket)
+                  @thread_q.push :token
+                  t = Thread.new(Thread.current) do |master|
+                    begin
+                      membuf = MemoryBuffer.new(buffer)
+                      intrans = @transportFactory.get_transport(membuf)
+                      inprot = @protocolFactory.get_protocol(intrans)
+                      @processor.process(inprot, outprot)
+                      if @shutdown
+                        client.close
+                        running_connections.delete(socket)
+                      else
+                        swapping_connections[socket] = running_connections.delete(socket)
+                        master.wakeup
+                      end
+                    rescue => e
+                      outtrans.close
+                      @exception_q.push e
+                    ensure
+                      running_connections.delete(socket)
+                      connections[socket] = swapping_connections.delete(socket) if swapping_connections.include? socket
+                      intrans.close
+                      @thread_q.pop
+                    end
+                  end
+                  thread_group.add t
+                end
+              end
+            end
+          end
+        end
+        if @shutdown
+          @serverTransport.close
+          connections.merge! running_connections
+          connections.merge! swapping_connections
+          connections.values.each do |client, buffer, outtrans, outprot|
+            # can't close completely or we'll break active messages
+            # but lets at least stop accepting input
+            client.handle.close_read
+          end
+          start = Time.now.to_f
+          until thread_group.list.empty?
+            if @shutdown_timeout
+              now = Time.now.to_f
+              cur_timeout = @shutdown_timeout - (now - start)
+              break if cur_timeout <= 0
+              thread_group.list.first.join(cur_timeout)
+            else
+              thread_group.list.first.join
+            end
+          end
+          thread_group.list.each { |t| t.kill } if @shutdown_kill
+          # now kill connections completely if they still exists
+          connections.values.each do |client, buffer, outtrans, outprot|
+            client.close
+          end
+        end
+      ensure
+        @serverTransport.close
+      end
+    end
+
+    # Stop accepting new messages and wait for active messages to finish
+    # If the given timeout passes without the active messages finishing,
+    # control will exit from #serve and leave the remaining threads active.
+    # If you pass true for kill, the remaining threads will be reaped instead.
+    # A false timeout means wait indefinitely
+    def shutdown(timeout = nil, kill = false)
+      @shutdown_timeout = timeout
+      @shutdown_kill = kill
+      @shutdown = true
+      @server_thread.wakeup
+    end
+
+    private
+
+    def has_full_frame?(buf)
+      return no unless buf.length >= 4
+      size = buf.unpack('N').first
+      size + 4 <= buf.length
+    end
+  end
+end

Modified: incubator/thrift/trunk/lib/rb/spec/ThriftSpec.thrift
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/rb/spec/ThriftSpec.thrift?rev=668999&r1=668998&r2=668999&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/rb/spec/ThriftSpec.thrift (original)
+++ incubator/thrift/trunk/lib/rb/spec/ThriftSpec.thrift Tue Jun 17 18:16:02 2008
@@ -16,3 +16,11 @@
 struct BoolStruct {
   1: bool yesno = 1
 }
+
+service NonblockingService {
+  Hello greeting(1:bool english)
+  bool block()
+  async void unblock()
+  async void shutdown()
+  void sleep(1:double seconds)
+}

Added: incubator/thrift/trunk/lib/rb/spec/gen-rb/NonblockingService.rb
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/rb/spec/gen-rb/NonblockingService.rb?rev=668999&view=auto
==============================================================================
--- incubator/thrift/trunk/lib/rb/spec/gen-rb/NonblockingService.rb (added)
+++ incubator/thrift/trunk/lib/rb/spec/gen-rb/NonblockingService.rb Tue Jun 17 18:16:02 2008
@@ -0,0 +1,192 @@
+#
+# Autogenerated by Thrift
+#
+# DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+#
+
+require 'thrift/protocol'
+require 'thrift'
+require 'ThriftSpec_types'
+
+    module SpecNamespace
+      module NonblockingService
+        class Client
+          include Thrift::Client
+
+          def greeting(english)
+            send_greeting(english)
+            return recv_greeting()
+          end
+
+          def send_greeting(english)
+            send_message('greeting', Greeting_args, :english => english)
+          end
+
+          def recv_greeting()
+            result = receive_message(Greeting_result)
+            return result.success unless result.success.nil?
+            raise Thrift::ApplicationException.new(Thrift::ApplicationException::MISSING_RESULT, 'greeting failed: unknown result')
+          end
+
+          def block()
+            send_block()
+            return recv_block()
+          end
+
+          def send_block()
+            send_message('block', Block_args)
+          end
+
+          def recv_block()
+            result = receive_message(Block_result)
+            return result.success unless result.success.nil?
+            raise Thrift::ApplicationException.new(Thrift::ApplicationException::MISSING_RESULT, 'block failed: unknown result')
+          end
+
+          def unblock()
+            send_unblock()
+          end
+
+          def send_unblock()
+            send_message('unblock', Unblock_args)
+          end
+          def shutdown()
+            send_shutdown()
+          end
+
+          def send_shutdown()
+            send_message('shutdown', Shutdown_args)
+          end
+          def sleep(seconds)
+            send_sleep(seconds)
+            recv_sleep()
+          end
+
+          def send_sleep(seconds)
+            send_message('sleep', Sleep_args, :seconds => seconds)
+          end
+
+          def recv_sleep()
+            result = receive_message(Sleep_result)
+            return
+          end
+
+        end
+
+        class Processor
+          include Thrift::Processor
+
+          def process_greeting(seqid, iprot, oprot)
+            args = read_args(iprot, Greeting_args)
+            result = Greeting_result.new()
+            result.success = @handler.greeting(args.english)
+            write_result(result, oprot, 'greeting', seqid)
+          end
+
+          def process_block(seqid, iprot, oprot)
+            args = read_args(iprot, Block_args)
+            result = Block_result.new()
+            result.success = @handler.block()
+            write_result(result, oprot, 'block', seqid)
+          end
+
+          def process_unblock(seqid, iprot, oprot)
+            args = read_args(iprot, Unblock_args)
+            @handler.unblock()
+            return
+          end
+
+          def process_shutdown(seqid, iprot, oprot)
+            args = read_args(iprot, Shutdown_args)
+            @handler.shutdown()
+            return
+          end
+
+          def process_sleep(seqid, iprot, oprot)
+            args = read_args(iprot, Sleep_args)
+            result = Sleep_result.new()
+            @handler.sleep(args.seconds)
+            write_result(result, oprot, 'sleep', seqid)
+          end
+
+        end
+
+        # HELPER FUNCTIONS AND STRUCTURES
+
+        class Greeting_args
+          include Thrift::Struct
+          attr_accessor :english
+          FIELDS = {
+            1 => {:type => Thrift::Types::BOOL, :name => 'english'}
+          }
+        end
+
+        class Greeting_result
+          include Thrift::Struct
+          attr_accessor :success
+          FIELDS = {
+            0 => {:type => Thrift::Types::STRUCT, :name => 'success', :class => Hello}
+          }
+        end
+
+        class Block_args
+          include Thrift::Struct
+          FIELDS = {
+
+          }
+        end
+
+        class Block_result
+          include Thrift::Struct
+          attr_accessor :success
+          FIELDS = {
+            0 => {:type => Thrift::Types::BOOL, :name => 'success'}
+          }
+        end
+
+        class Unblock_args
+          include Thrift::Struct
+          FIELDS = {
+
+          }
+        end
+
+        class Unblock_result
+          include Thrift::Struct
+          FIELDS = {
+
+          }
+        end
+
+        class Shutdown_args
+          include Thrift::Struct
+          FIELDS = {
+
+          }
+        end
+
+        class Shutdown_result
+          include Thrift::Struct
+          FIELDS = {
+
+          }
+        end
+
+        class Sleep_args
+          include Thrift::Struct
+          attr_accessor :seconds
+          FIELDS = {
+            1 => {:type => Thrift::Types::DOUBLE, :name => 'seconds'}
+          }
+        end
+
+        class Sleep_result
+          include Thrift::Struct
+          FIELDS = {
+
+          }
+        end
+
+      end
+
+    end

Added: incubator/thrift/trunk/lib/rb/spec/nonblockingserver_spec.rb
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/rb/spec/nonblockingserver_spec.rb?rev=668999&view=auto
==============================================================================
--- incubator/thrift/trunk/lib/rb/spec/nonblockingserver_spec.rb (added)
+++ incubator/thrift/trunk/lib/rb/spec/nonblockingserver_spec.rb Tue Jun 17 18:16:02 2008
@@ -0,0 +1,166 @@
+require File.dirname(__FILE__) + '/spec_helper'
+require 'thrift/server/nonblockingserver'
+$:.unshift File.dirname(__FILE__) + '/gen-rb'
+require 'NonblockingService'
+
+class ThriftNonblockingServerSpec < Spec::ExampleGroup
+  include Thrift
+  include SpecNamespace
+
+  class Handler
+    def initialize
+      @queue = Queue.new
+    end
+
+    attr_accessor :server
+
+    def greeting(english)
+      if english
+        SpecNamespace::Hello.new
+      else
+        SpecNamespace::Hello.new(:greeting => "Aloha!")
+      end
+    end
+
+    def block
+      @queue.pop
+    end
+
+    def unblock
+      @queue.num_waiting.times { @queue.push true }
+    end
+
+    def sleep(time)
+      Kernel.sleep time
+    end
+
+    def shutdown
+      @server.shutdown
+    end
+  end
+
+  before(:each) do
+    @port = 43251
+    handler = Handler.new
+    processor = NonblockingService::Processor.new(handler)
+    @transport = ServerSocket.new('localhost', @port)
+    transportFactory = FramedTransportFactory.new
+    @server = NonblockingServer.new(processor, @transport, transportFactory, nil, 5)
+    handler.server = @server
+    @server_thread = Thread.new do
+      begin
+        @server.serve
+      rescue => e
+        p e
+        puts e.backtrace * "\n"
+        raise e
+      end
+    end
+    Thread.pass
+
+    @clients = []
+  end
+
+  after(:each) do
+    @clients.each { |client, trans| trans.close }
+    @server_thread.kill
+    @transport.close
+  end
+
+  def setup_client
+    transport = FramedTransport.new(Socket.new('localhost', @port))
+    protocol = BinaryProtocol.new(transport)
+    client = NonblockingService::Client.new(protocol)
+    transport.open
+    @clients << [client, transport]
+    client
+  end
+
+  def setup_client_thread(result)
+    queue = Queue.new
+    Thread.new do
+      client = setup_client
+      while (msg = queue.pop)
+        case msg
+        when :block
+          result << client.block
+        when :unblock
+          client.unblock
+        when :hello
+          result << client.greeting(true) # ignore result
+        when :sleep
+          client.sleep(0.5)
+          result << :slept
+        when :shutdown
+          client.shutdown
+        when :exit
+          result << :done
+          break
+        end
+      end
+      @clients.each { |c,t| t.close and break if c == client } #close the transport
+    end
+    queue
+  end
+
+  it "should handle basic message passing" do
+    client = setup_client
+    client.greeting(true).should == Hello.new
+    client.greeting(false).should == Hello.new(:greeting => 'Aloha!')
+  end
+
+  it "should handle concurrent clients" do
+    queue = Queue.new
+    4.times { Thread.new { queue.push setup_client.block } }
+    setup_client.unblock
+    4.times { queue.pop.should be_true }
+  end
+
+  it "should handle messages from more than 5 long-lived connections" do
+    queues = []
+    result = Queue.new
+    7.times do |i|
+      queues << setup_client_thread(result)
+      Thread.pass if i == 4 # give the server time to accept connections
+    end
+    client = setup_client
+    # block 4 connections
+    4.times { |i| queues[i] << :block }
+    queues[4] << :hello
+    queues[5] << :hello
+    queues[6] << :hello
+    3.times { result.pop.should == Hello.new }
+    client.greeting(true).should == Hello.new
+    queues[5] << :unblock
+    4.times { result.pop.should be_true }
+    queues[2] << :hello
+    result.pop.should == Hello.new
+    client.greeting(false).should == Hello.new(:greeting => 'Aloha!')
+    7.times { queues.shift << :exit }
+    client.greeting(true).should == Hello.new
+  end
+
+  it "should shut down when asked" do
+    @server.shutdown
+    @server_thread.join(2).should be_an_instance_of(Thread)
+  end
+
+  it "should continue processing active messages when shutting down" do
+    result = Queue.new
+    client = setup_client_thread(result)
+    client << :sleep
+    sleep 0.1 # give the server time to start processing the client's message
+    @server.shutdown
+    @server_thread.join(2).should be_an_instance_of(Thread)
+    result.pop.should == :slept
+  end
+
+  it "should kill active messages when they don't expire while shutting down" do
+    result = Queue.new
+    client = setup_client_thread(result)
+    client << :block
+    sleep 0.1 # start processing the client's message
+    @server.shutdown(1, true)
+    @server_thread.join(3).should_not be_nil
+  end
+end