You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@whimsical.apache.org by Sam Ruby <ru...@apache.org> on 2016/03/24 20:43:33 UTC

[whimsy.git] [1/1] Commit a426a40: experimental branch for threaded events

Commit a426a40686e967d8d679bc1663e2eb288c78d618:
    experimental branch for threaded events


Branch: refs/heads/threaded_events
Author: Sam Ruby <ru...@intertwingly.net>
Committer: Sam Ruby <ru...@intertwingly.net>
Pusher: rubys <ru...@apache.org>

------------------------------------------------------------
www/board/agenda/models/events.rb                            | ++++++++++++++ 
www/board/agenda/models/ipc.rb                               | ++++++++ -
www/board/agenda/routes.rb                                   | +++++ -----
------------------------------------------------------------
136 changes: 109 additions, 27 deletions.
------------------------------------------------------------


diff --git a/www/board/agenda/models/events.rb b/www/board/agenda/models/events.rb
index 412fd29..aa83c0f 100644
--- a/www/board/agenda/models/events.rb
+++ b/www/board/agenda/models/events.rb
@@ -4,6 +4,8 @@
 #  * Closes all sockets when restart is detected
 #
 
+require 'json'
+
 class EventService
   attr_accessor :user
   attr_accessor :token
@@ -80,6 +82,32 @@ def self.unsubscribe(token)
     end
   end
 
+  # send events to a hijacked socket
+  def self.hijack(user, socket)
+    STDERR.puts 'hijacked'
+    subscription = subscribe(user)
+    loop do
+      event = pop(subscription)
+      STDERR.puts event
+      if Hash === event or Array === event
+        socket.write "data: #{JSON.dump(event)}\n\n"
+      elsif event == :heartbeat
+        socket.write ":\n"
+      elsif event == :exit
+        break
+      elsif event == nil
+        subscription = subscribe(env.user)
+      else
+        socket.write "data: #{event.inspect}\n\n"
+      end
+      socket.flush
+    end
+  ensure
+    STDERR.puts 'done'
+    unsubscribe(subscription)
+    socket.close
+  end
+
   # When restart signal is detected, close all open connections
   def self.hook_restart
     # puma uses SIGUSR2
diff --git a/www/board/agenda/models/ipc.rb b/www/board/agenda/models/ipc.rb
index ecc6f62..1a5f4e3 100644
--- a/www/board/agenda/models/ipc.rb
+++ b/www/board/agenda/models/ipc.rb
@@ -18,7 +18,9 @@
 require 'thread'
 
 class IPC_Server
-  SOCKET = 'druby://:9146'
+  AGENDA_WORK = ARGV[1] unless defined? AGENDA_WORK
+  DRB_SOCKET = 'drbunix://' + File.expand_path('drb.sock', AGENDA_WORK)
+  HIJACK_SOCKET = File.expand_path('hijack.sock', AGENDA_WORK)
 
   attr_accessor :object
 
@@ -46,7 +48,7 @@ def self.start_server
         RbConfig::CONFIG["ruby_install_name"] + RbConfig::CONFIG["EXEEXT"]
       )
 
-      exec(ruby, __FILE__.dup.untaint, '--server-only')
+      exec(ruby, __FILE__.dup.untaint, '--server-only' , AGENDA_WORK)
     end
 
     Process.detach pid
@@ -66,24 +68,78 @@ def self.start_server
 elsif ARGV[0] == '--server-only'
 
   if  __FILE__ == $0
-    Signal.trap('INT') {sleep 1; exit}
-    Signal.trap('TERM') {exit}
-    Signal.trap('USR2') {exit}
+    STDERR.puts 'forked'
+begin
+    # daemonize
+    # Process.daemon
 
+    # clean up any sockets left behind
+    if File.exist? IPC_Server::HIJACK_SOCKET
+      begin
+        UNIXSocket.new(IPC_Server::HIJACK_SOCKET).close
+      rescue Errno::ECONNREFUSED
+        File.unlink IPC_Server::HIJACK_SOCKET
+      end
+    end
+
+    # try not to leave any sockets behind
+    at_exit do
+      STDERR.puts 'exiting'
+      begin
+        File.unlink IPC_Server::HIJACK_SOCKET
+        File.unlink DRB_SOCKET.split('//').last
+      rescue
+      end
+    end
+
+    # exit on signal
+    Signal.trap('INT') {sleep 1; STDERR.puts 'bye'; exit}
+    Signal.trap('TERM') {STDERR.puts 'bye'; exit}
+    Signal.trap('USR2') {STDERR.puts 'bye'; exit}
+
+    # event code
     require_relative 'events'
 
     begin
-      DRb.start_service(IPC_Server::SOCKET, EventService)
+      # start IPC connection to EventService
+      DRb.start_service(IPC_Server::DRB_SOCKET, EventService)
+
+      # listen for hijacked sockets
+      STDERR.puts 'starting...'
+      listener = UNIXServer.new(IPC_Server::HIJACK_SOCKET)
+      STDERR.puts 'listening...'
+
+      loop do
+        Thread.start(listener.accept) do |client|
+        STDERR.puts 'got something...'
+          # Receive message, socket, pass to EventService
+          msg, sockaddr, rflags, *controls = client.recvmsg(scm_rights: true)
+          ancdata = controls.find {|ancdata| ancdata.cmsg_is?(:SOCKET, :RIGHTS)}
+          client.close
+          STDERR.puts msg
+          STDERR.puts ancdata
+          begin
+          EventService.hijack(msg, ancdata.unix_rights[0]) if ancdata
+          rescue Exception => e
+            STDERR.puts e
+          end
+        end
+      end
+
       DRb.thread.join
     rescue Errno::EADDRINUSE => e
       exit
     end
+rescue Exception => e
+  STDERR.puts e
+  STDERR.puts e.backtrace
+end
   end
 
 else
 
   # IPC client
-  IPC = IPC_Server.new(DRbObject.new(nil, IPC_Server::SOCKET))
+  IPC = IPC_Server.new(DRbObject.new(nil, IPC_Server::DRB_SOCKET))
 
 end
 
diff --git a/www/board/agenda/routes.rb b/www/board/agenda/routes.rb
index b24deac..58e14c2 100755
--- a/www/board/agenda/routes.rb
+++ b/www/board/agenda/routes.rb
@@ -202,26 +202,24 @@
 
 # event stream for server sent events (a.k.a EventSource)
 get '/events', provides: 'text/event-stream' do
-  stream :keep_open do |out|
-    subscription = IPC.subscribe(env.user)
-    out.callback {IPC.unsubscribe(subscription)}
-
-    loop do
-      event = IPC.pop(subscription)
-      if Hash === event or Array === event
-        out << "data: #{JSON.dump(event)}\n\n"
-      elsif event == :heartbeat
-        out << ":\n"
-      elsif event == :exit
-        out.close
-        break
-      elsif event == nil
-        subscription = IPC.subscribe(env.user)
-      else
-        out << "data: #{event.inspect}\n\n"
-      end
-    end
-  end
+  # hijack io socket
+  env['rack.hijack'].call
+  io = env['rack.hijack_io']
+
+  # send HTTP headers
+  io.write "HTTP/1.1 200 OK\r\n"
+  io.write "Content-Type: text/event-stream\r\n"
+  io.write "\r\n"
+  io.flush
+
+  # hand off socket to event server
+  socket = UNIXSocket.new(IPC_Server::HIJACK_SOCKET)
+  socket.sendmsg env.user, 0, nil,
+    Socket::AncillaryData.int(:UNIX, :SOCKET, :RIGHTS, io.fileno)
+  socket.close
+
+  # for logging purposes
+  "event stream"
 end
 
 # draft minutes