You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@whimsical.apache.org by Sam Ruby <ru...@apache.org> on 2016/03/24 20:43:33 UTC
[whimsy.git] [1/1] Commit a426a40: experimental branch for threaded events
Commit a426a40686e967d8d679bc1663e2eb288c78d618:
experimental branch for threaded events
Branch: refs/heads/threaded_events
Author: Sam Ruby <ru...@intertwingly.net>
Committer: Sam Ruby <ru...@intertwingly.net>
Pusher: rubys <ru...@apache.org>
------------------------------------------------------------
www/board/agenda/models/events.rb | ++++++++++++++
www/board/agenda/models/ipc.rb | ++++++++ -
www/board/agenda/routes.rb | +++++ -----
------------------------------------------------------------
136 changes: 109 additions, 27 deletions.
------------------------------------------------------------
diff --git a/www/board/agenda/models/events.rb b/www/board/agenda/models/events.rb
index 412fd29..aa83c0f 100644
--- a/www/board/agenda/models/events.rb
+++ b/www/board/agenda/models/events.rb
@@ -4,6 +4,8 @@
# * Closes all sockets when restart is detected
#
+require 'json'
+
class EventService
attr_accessor :user
attr_accessor :token
@@ -80,6 +82,32 @@ def self.unsubscribe(token)
end
end
+ # send events to a hijacked socket
+ def self.hijack(user, socket)
+ STDERR.puts 'hijacked'
+ subscription = subscribe(user)
+ loop do
+ event = pop(subscription)
+ STDERR.puts event
+ if Hash === event or Array === event
+ socket.write "data: #{JSON.dump(event)}\n\n"
+ elsif event == :heartbeat
+ socket.write ":\n"
+ elsif event == :exit
+ break
+ elsif event == nil
+ subscription = subscribe(env.user)
+ else
+ socket.write "data: #{event.inspect}\n\n"
+ end
+ socket.flush
+ end
+ ensure
+ STDERR.puts 'done'
+ unsubscribe(subscription)
+ socket.close
+ end
+
# When restart signal is detected, close all open connections
def self.hook_restart
# puma uses SIGUSR2
diff --git a/www/board/agenda/models/ipc.rb b/www/board/agenda/models/ipc.rb
index ecc6f62..1a5f4e3 100644
--- a/www/board/agenda/models/ipc.rb
+++ b/www/board/agenda/models/ipc.rb
@@ -18,7 +18,9 @@
require 'thread'
class IPC_Server
- SOCKET = 'druby://:9146'
+ AGENDA_WORK = ARGV[1] unless defined? AGENDA_WORK
+ DRB_SOCKET = 'drbunix://' + File.expand_path('drb.sock', AGENDA_WORK)
+ HIJACK_SOCKET = File.expand_path('hijack.sock', AGENDA_WORK)
attr_accessor :object
@@ -46,7 +48,7 @@ def self.start_server
RbConfig::CONFIG["ruby_install_name"] + RbConfig::CONFIG["EXEEXT"]
)
- exec(ruby, __FILE__.dup.untaint, '--server-only')
+ exec(ruby, __FILE__.dup.untaint, '--server-only' , AGENDA_WORK)
end
Process.detach pid
@@ -66,24 +68,78 @@ def self.start_server
elsif ARGV[0] == '--server-only'
if __FILE__ == $0
- Signal.trap('INT') {sleep 1; exit}
- Signal.trap('TERM') {exit}
- Signal.trap('USR2') {exit}
+ STDERR.puts 'forked'
+begin
+ # daemonize
+ # Process.daemon
+ # clean up any sockets left behind
+ if File.exist? IPC_Server::HIJACK_SOCKET
+ begin
+ UNIXSocket.new(IPC_Server::HIJACK_SOCKET).close
+ rescue Errno::ECONNREFUSED
+ File.unlink IPC_Server::HIJACK_SOCKET
+ end
+ end
+
+ # try not to leave any sockets behind
+ at_exit do
+ STDERR.puts 'exiting'
+ begin
+ File.unlink IPC_Server::HIJACK_SOCKET
+ File.unlink DRB_SOCKET.split('//').last
+ rescue
+ end
+ end
+
+ # exit on signal
+ Signal.trap('INT') {sleep 1; STDERR.puts 'bye'; exit}
+ Signal.trap('TERM') {STDERR.puts 'bye'; exit}
+ Signal.trap('USR2') {STDERR.puts 'bye'; exit}
+
+ # event code
require_relative 'events'
begin
- DRb.start_service(IPC_Server::SOCKET, EventService)
+ # start IPC connection to EventService
+ DRb.start_service(IPC_Server::DRB_SOCKET, EventService)
+
+ # listen for hijacked sockets
+ STDERR.puts 'starting...'
+ listener = UNIXServer.new(IPC_Server::HIJACK_SOCKET)
+ STDERR.puts 'listening...'
+
+ loop do
+ Thread.start(listener.accept) do |client|
+ STDERR.puts 'got something...'
+ # Receive message, socket, pass to EventService
+ msg, sockaddr, rflags, *controls = client.recvmsg(scm_rights: true)
+ ancdata = controls.find {|ancdata| ancdata.cmsg_is?(:SOCKET, :RIGHTS)}
+ client.close
+ STDERR.puts msg
+ STDERR.puts ancdata
+ begin
+ EventService.hijack(msg, ancdata.unix_rights[0]) if ancdata
+ rescue Exception => e
+ STDERR.puts e
+ end
+ end
+ end
+
DRb.thread.join
rescue Errno::EADDRINUSE => e
exit
end
+rescue Exception => e
+ STDERR.puts e
+ STDERR.puts e.backtrace
+end
end
else
# IPC client
- IPC = IPC_Server.new(DRbObject.new(nil, IPC_Server::SOCKET))
+ IPC = IPC_Server.new(DRbObject.new(nil, IPC_Server::DRB_SOCKET))
end
diff --git a/www/board/agenda/routes.rb b/www/board/agenda/routes.rb
index b24deac..58e14c2 100755
--- a/www/board/agenda/routes.rb
+++ b/www/board/agenda/routes.rb
@@ -202,26 +202,24 @@
# event stream for server sent events (a.k.a EventSource)
get '/events', provides: 'text/event-stream' do
- stream :keep_open do |out|
- subscription = IPC.subscribe(env.user)
- out.callback {IPC.unsubscribe(subscription)}
-
- loop do
- event = IPC.pop(subscription)
- if Hash === event or Array === event
- out << "data: #{JSON.dump(event)}\n\n"
- elsif event == :heartbeat
- out << ":\n"
- elsif event == :exit
- out.close
- break
- elsif event == nil
- subscription = IPC.subscribe(env.user)
- else
- out << "data: #{event.inspect}\n\n"
- end
- end
- end
+ # hijack io socket
+ env['rack.hijack'].call
+ io = env['rack.hijack_io']
+
+ # send HTTP headers
+ io.write "HTTP/1.1 200 OK\r\n"
+ io.write "Content-Type: text/event-stream\r\n"
+ io.write "\r\n"
+ io.flush
+
+ # hand off socket to event server
+ socket = UNIXSocket.new(IPC_Server::HIJACK_SOCKET)
+ socket.sendmsg env.user, 0, nil,
+ Socket::AncillaryData.int(:UNIX, :SOCKET, :RIGHTS, io.fileno)
+ socket.close
+
+ # for logging purposes
+ "event stream"
end
# draft minutes