You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2018/03/23 15:08:55 UTC
[1/8] qpid-proton git commit: NO-JIRA: [cpp] remove out-of-date FIXME
comment.
Repository: qpid-proton
Updated Branches:
refs/heads/master fbfdeb784 -> aa8d37272
NO-JIRA: [cpp] remove out-of-date FIXME comment.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/a65a1751
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/a65a1751
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/a65a1751
Branch: refs/heads/master
Commit: a65a17517cadc62a629f004e6ba5a9ac60cf5f71
Parents: fbfdeb7
Author: Alan Conway <ac...@redhat.com>
Authored: Thu Mar 22 13:19:25 2018 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Fri Mar 23 09:27:41 2018 -0400
----------------------------------------------------------------------
proton-c/bindings/cpp/src/container_test.cpp | 1 -
1 file changed, 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a65a1751/proton-c/bindings/cpp/src/container_test.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/container_test.cpp b/proton-c/bindings/cpp/src/container_test.cpp
index 8f9075f..5bfaa28 100644
--- a/proton-c/bindings/cpp/src/container_test.cpp
+++ b/proton-c/bindings/cpp/src/container_test.cpp
@@ -364,7 +364,6 @@ int test_container_mt_stop() {
return 0;
}
-// FIXME aconway 2018-01-04: test busy stop from other thread
#endif
} // namespace
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[2/8] qpid-proton git commit: NO-JIRA: [ruby] Removed unused Timeout
module.
Posted by ac...@apache.org.
NO-JIRA: [ruby] Removed unused Timeout module.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/1108c4e4
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/1108c4e4
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/1108c4e4
Branch: refs/heads/master
Commit: 1108c4e47a439f630d72127f6b00fdd9003cb9b7
Parents: d65528c
Author: Alan Conway <ac...@redhat.com>
Authored: Thu Mar 22 14:46:06 2018 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Fri Mar 23 09:39:39 2018 -0400
----------------------------------------------------------------------
proton-c/bindings/ruby/lib/qpid_proton.rb | 1 -
proton-c/bindings/ruby/lib/util/timeout.rb | 49 --------------------
.../ruby/spec/exception_handling_spec.rb | 2 +-
3 files changed, 1 insertion(+), 51 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1108c4e4/proton-c/bindings/ruby/lib/qpid_proton.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/qpid_proton.rb b/proton-c/bindings/ruby/lib/qpid_proton.rb
index c446a79..c52310b 100644
--- a/proton-c/bindings/ruby/lib/qpid_proton.rb
+++ b/proton-c/bindings/ruby/lib/qpid_proton.rb
@@ -45,7 +45,6 @@ require "util/deprecation"
require "util/version"
require "util/error_handler"
require "util/wrapper"
-require "util/timeout"
# Types
require "types/type"
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1108c4e4/proton-c/bindings/ruby/lib/util/timeout.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/util/timeout.rb b/proton-c/bindings/ruby/lib/util/timeout.rb
deleted file mode 100644
index 5e299dd..0000000
--- a/proton-c/bindings/ruby/lib/util/timeout.rb
+++ /dev/null
@@ -1,49 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-
-module Qpid::Proton::Util
-
- # Provides methods for converting between milliseconds, seconds
- # and timeout values.
- #
- # @private
- module Timeout
-
- def sec_to_millis(s)
- return (s * 1000).to_int
- end
-
- def millis_to_sec(ms)
- return (ms.to_f / 1000.0).to_int
- end
-
- def timeout_to_millis(s)
- return Cproton::PN_MILLIS_MAX if s.nil?
-
- return sec_to_millis(s)
- end
-
- def millis_to_timeout(ms)
- return nil if ms == Cproton::PN_MILLIS_MAX
-
- return millis_to_sec(ms)
- end
-
- end
-
-end
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1108c4e4/proton-c/bindings/ruby/spec/exception_handling_spec.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/spec/exception_handling_spec.rb b/proton-c/bindings/ruby/spec/exception_handling_spec.rb
index acfb55e..3d65b40 100644
--- a/proton-c/bindings/ruby/spec/exception_handling_spec.rb
+++ b/proton-c/bindings/ruby/spec/exception_handling_spec.rb
@@ -70,7 +70,7 @@ module Qpid
}.must_raise(Qpid::Proton::ArgumentError)
end
- it "raises Timeout on PN_TIMEOUT" do
+ it "raises TimeoutError on PN_TIMEOUT" do
proc {
@handler.check_for_error(Qpid::Proton::Error::TIMEOUT)
}.must_raise(Qpid::Proton::TimeoutError)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[4/8] qpid-proton git commit: NO-JIRA: [ruby] move yard/options to
standard location .yardopts
Posted by ac...@apache.org.
NO-JIRA: [ruby] move yard/options to standard location .yardopts
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/1f682a6c
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/1f682a6c
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/1f682a6c
Branch: refs/heads/master
Commit: 1f682a6ceff7d8cf6e7c21f7112019d3b79de995
Parents: 5a34fda
Author: Alan Conway <ac...@redhat.com>
Authored: Wed Mar 21 14:01:01 2018 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Fri Mar 23 09:39:39 2018 -0400
----------------------------------------------------------------------
proton-c/bindings/ruby/.yardopts | 8 ++++++++
proton-c/bindings/ruby/CMakeLists.txt | 2 +-
proton-c/bindings/ruby/yard/options | 8 --------
3 files changed, 9 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1f682a6c/proton-c/bindings/ruby/.yardopts
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/.yardopts b/proton-c/bindings/ruby/.yardopts
new file mode 100644
index 0000000..166989d
--- /dev/null
+++ b/proton-c/bindings/ruby/.yardopts
@@ -0,0 +1,8 @@
+--readme README.rdoc
+--quiet
+--no-progress
+--no-private
+--default-return ""
+--hide-void-return
+--template default
+--api qpid
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1f682a6c/proton-c/bindings/ruby/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/CMakeLists.txt b/proton-c/bindings/ruby/CMakeLists.txt
index 22b0be0..d8f9a44 100644
--- a/proton-c/bindings/ruby/CMakeLists.txt
+++ b/proton-c/bindings/ruby/CMakeLists.txt
@@ -144,7 +144,7 @@ if (YARD_EXE)
add_custom_command(
OUTPUT ${bin}/doc
WORKING_DIRECTORY ${src}
- COMMAND ${YARD_EXE} -o ${bin}/doc -b ${bin}/.yardoc --yardopts ${src}/yard/options -p ${src}/yard/templates
+ COMMAND ${YARD_EXE} -o ${bin}/doc -b ${bin}/.yardoc --yardopts ${src}/.yardopts -p ${src}/yard/templates
DEPENDS ${RUBY_SRC}
)
add_custom_target(docs-ruby DEPENDS ${bin}/doc)
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1f682a6c/proton-c/bindings/ruby/yard/options
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/yard/options b/proton-c/bindings/ruby/yard/options
deleted file mode 100644
index 166989d..0000000
--- a/proton-c/bindings/ruby/yard/options
+++ /dev/null
@@ -1,8 +0,0 @@
---readme README.rdoc
---quiet
---no-progress
---no-private
---default-return ""
---hide-void-return
---template default
---api qpid
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[3/8] qpid-proton git commit: NO-JIRA: [ruby] Clean up 'ruby -W2'
warnings
Posted by ac...@apache.org.
NO-JIRA: [ruby] Clean up 'ruby -W2' warnings
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/5a34fda9
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/5a34fda9
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/5a34fda9
Branch: refs/heads/master
Commit: 5a34fda9d3844d27c8ef2349246e930e66299593
Parents: a65a175
Author: Alan Conway <ac...@redhat.com>
Authored: Thu Mar 22 10:28:16 2018 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Fri Mar 23 09:39:39 2018 -0400
----------------------------------------------------------------------
proton-c/bindings/ruby/lib/core/container.rb | 1 +
proton-c/bindings/ruby/lib/core/transport.rb | 1 +
proton-c/bindings/ruby/lib/types/array.rb | 1 +
proton-c/bindings/ruby/tests/test_container.rb | 4 +++-
proton-c/bindings/ruby/tests/test_tools.rb | 1 -
5 files changed, 6 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5a34fda9/proton-c/bindings/ruby/lib/core/container.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/container.rb b/proton-c/bindings/ruby/lib/core/container.rb
index d4fe5d1..f8ff032 100644
--- a/proton-c/bindings/ruby/lib/core/container.rb
+++ b/proton-c/bindings/ruby/lib/core/container.rb
@@ -205,6 +205,7 @@ module Qpid::Proton
# concurrently.
#
def initialize(*args)
+ @handler, @id, @panic = nil
case args.size
when 2 then @handler, @id = args
when 1 then
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5a34fda9/proton-c/bindings/ruby/lib/core/transport.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/transport.rb b/proton-c/bindings/ruby/lib/core/transport.rb
index e629b0c..6a6c4f5 100644
--- a/proton-c/bindings/ruby/lib/core/transport.rb
+++ b/proton-c/bindings/ruby/lib/core/transport.rb
@@ -170,6 +170,7 @@ module Qpid::Proton
# Creates a new transport instance.
def initialize(impl = Cproton.pn_transport)
@impl = impl
+ @ssl = nil
self.class.store_instance(self, :pn_transport_attachments)
end
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5a34fda9/proton-c/bindings/ruby/lib/types/array.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/types/array.rb b/proton-c/bindings/ruby/lib/types/array.rb
index d7e15e6..62d9274 100644
--- a/proton-c/bindings/ruby/lib/types/array.rb
+++ b/proton-c/bindings/ruby/lib/types/array.rb
@@ -49,6 +49,7 @@ module Qpid::Proton
# @param descriptor [Object] Optional array descriptor
def initialize(type, elements=nil, descriptor=nil)
@type, @descriptor = type, descriptor
+ @proton_array_header = nil
raise ArgumentError, "no type specified for array" if @type.nil?
super elements if elements
end
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5a34fda9/proton-c/bindings/ruby/tests/test_container.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/tests/test_container.rb b/proton-c/bindings/ruby/tests/test_container.rb
index a0c32ed..faa505a 100644
--- a/proton-c/bindings/ruby/tests/test_container.rb
+++ b/proton-c/bindings/ruby/tests/test_container.rb
@@ -32,6 +32,8 @@ class ContainerTest < MiniTest::Test
send_handler = Class.new(ExceptionMessagingHandler) do
attr_reader :accepted, :sent
+ def initialize() @sent, @accepted = nil; end
+
def on_sendable(sender)
unless @sent
m = Message.new("hello")
@@ -152,7 +154,7 @@ class ContainerTest < MiniTest::Test
def test_bad_host
cont = Container.new(__method__)
assert_raises (SocketError) { cont.listen("badlisten.example.com:999") }
- assert_raises (SocketError) { c = cont.connect("badconnect.example.com:999") }
+ assert_raises (SocketError) { cont.connect("badconnect.example.com:999") }
end
# Verify that connection options are sent to the peer
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5a34fda9/proton-c/bindings/ruby/tests/test_tools.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/tests/test_tools.rb b/proton-c/bindings/ruby/tests/test_tools.rb
index 911669b..091322d 100644
--- a/proton-c/bindings/ruby/tests/test_tools.rb
+++ b/proton-c/bindings/ruby/tests/test_tools.rb
@@ -105,7 +105,6 @@ end
# Add port/url to Listener, assuming a TCP socket
class Qpid::Proton::Listener
- def port() to_io.addr[1]; end
def url() "amqp://:#{port}"; end
end
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[5/8] qpid-proton git commit: PROTON-1803: [ruby] Container support
for scheduled tasks
Posted by ac...@apache.org.
PROTON-1803: [ruby] Container support for scheduled tasks
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/02f49551
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/02f49551
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/02f49551
Branch: refs/heads/master
Commit: 02f495510cb9f5e5308393fe8bb8e44df8ecebcd
Parents: 1108c4e
Author: Alan Conway <ac...@redhat.com>
Authored: Thu Mar 22 16:01:05 2018 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Fri Mar 23 09:39:39 2018 -0400
----------------------------------------------------------------------
proton-c/bindings/ruby/lib/core/container.rb | 115 ++++++++++++++++----
proton-c/bindings/ruby/lib/qpid_proton.rb | 1 +
proton-c/bindings/ruby/lib/util/schedule.rb | 63 +++++++++++
proton-c/bindings/ruby/tests/test_container.rb | 40 ++++++-
4 files changed, 194 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/02f49551/proton-c/bindings/ruby/lib/core/container.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/container.rb b/proton-c/bindings/ruby/lib/core/container.rb
index 2d920b4..c581d34 100644
--- a/proton-c/bindings/ruby/lib/core/container.rb
+++ b/proton-c/bindings/ruby/lib/core/container.rb
@@ -21,6 +21,8 @@ require 'set'
require_relative 'listener'
module Qpid::Proton
+ public
+
# An AMQP container manages a set of {Listener}s and {Connection}s which
# contain {#Sender} and {#Receiver} links to transfer messages. Usually, each
# AMQP client or server process has a single container for all of its
@@ -29,6 +31,8 @@ module Qpid::Proton
# One or more threads can call {#run}, events generated by all the listeners and
# connections will be dispatched in the {#run} threads.
class Container
+ include TimeCompare
+
# Error raised if the container is used after {#stop} has been called.
class StoppedError < RuntimeError
def initialize(*args) super("container has been stopped"); end
@@ -62,16 +66,17 @@ module Qpid::Proton
# Implementation note:
#
- # - #run threads take work from @work
- # - Each driver and the Container itself is processed by at most one #run thread at a time
- # - The Container thread does IO.select
+ # - #run threads take work items from @work, process them, and rearm them for select
+ # - work items are: ConnectionTask, ListenTask, :start, :select, :schedule
# - nil on the @work queue makes a #run thread exit
@work = Queue.new
@work << :start
- @work << self # Issue start and start start selecting
+ @work << :select
@wake = SelectWaker.new # Wakes #run thread in IO.select
@auto_stop = true # Stop when @active drops to 0
+ @schedule = Schedule.new
+ @schedule_working = false # True if :schedule is on the work queue
# Following instance variables protected by lock
@lock = Mutex.new
@@ -234,6 +239,17 @@ module Qpid::Proton
@wake.wake
end
+ # Schedule code to be executed after a delay.
+ # @param delay [Numeric] delay in seconds, must be >= 0
+ # @yield [ ] the block is invoked with no parameters in a {#run} thread after +delay+ has elapsed
+ def schedule(delay, &block)
+ delay >= 0.0 or raise ArgumentError, "delay=#{delay} must be >= 0"
+ block_given? or raise ArgumentError, "no block given"
+ not_stopped
+ @lock.synchronize { @active += 1 } if @schedule.add(Time.now + delay, &block)
+ @wake.wake
+ end
+
private
# Container driver applies options and adds container context to events
@@ -288,6 +304,8 @@ module Qpid::Proton
STDERR.puts "#{@log_prefix}#{([method[3..-1].upcase]+args).join ', '})" if @log_prefix
@handler.__send__(method, self, *args) if @handler && @handler.respond_to?(method)
end
+
+ def next_tick() nil; end
end
# Selectable object that can be used to wake IO.select from another thread
@@ -329,35 +347,44 @@ module Qpid::Proton
when :start
@adapter.on_container_start(self) if @adapter.respond_to? :on_container_start
- when Container
+ when :select
+ # Compute read/write select sets and minimum next_tick for select timeout
r, w = [@wake], []
- next_tick = nil
+ next_tick = @schedule.next_tick
@lock.synchronize do
@selectable.each do |s|
r << s if s.send :can_read?
w << s if s.send :can_write?
- next_tick = next_tick_min(s, next_tick)
+ next_tick = earliest(s.next_tick, next_tick)
end
end
+
now = Time.now
timeout = ((next_tick > now) ? next_tick - now : 0) if next_tick
r, w = IO.select(r, w, nil, timeout)
now = Time.now
- selected = Set.new(r).delete(@wake)
+ @wake.reset if r && r.delete(@wake)
+
+ # selected is a Set to eliminate duplicates between r, w and next_tick due.
+ selected = Set.new
+ selected.merge(r) if r
selected.merge(w) if w
- selected.merge(@selectable.select { |s| next_tick_due(s, now) })
- @wake.reset
- stop_select = nil
@lock.synchronize do
- if stop_select = @stopped # close everything
- selected += @selectable
- selected.each { |s| s.close @stop_err }
+ if @stopped # close everything
+ @selectable.each { |s| s.close @stop_err; @work << s }
+ @selectable.clear
@wake.close
+ return
end
- @selectable -= selected # Remove selected tasks
+ if !@schedule_working && before_eq(@schedule.next_tick, now)
+ @schedule_working = true
+ @work << :schedule
+ end
+ selected.merge(@selectable.select { |s| before_eq(s.next_tick, now) })
+ @selectable -= selected # Remove selected tasks from @selectable
end
selected.each { |s| @work << s } # Queue up tasks needing #process
- @work << self unless stop_select
+ @work << :select # Enable next select
when ConnectionTask then
maybe_panic { task.process }
@@ -367,17 +394,56 @@ module Qpid::Proton
io, opts = maybe_panic { task.process }
add(connection_driver(io, opts, true)) if io
rearm task
- end
- end
- def next_tick_due(x, now)
- nt = x.respond_to?(:next_tick) && x.next_tick
- nt && (nt <= now)
+ when :schedule then
+ if maybe_panic { @schedule.process Time.now }
+ @lock.synchronize { @active -= 1; check_stop_lh }
+ else
+ @lock.synchronize { @schedule_working = false }
+ end
+ end
end
- def next_tick_min(x, t)
- nt = x.respond_to?(:next_tick) && x.next_tick
- nt if !t || (nt < t)
+ def do_select
+ # Compute the sets to select for read and write, and the minimum next_tick for the timeout
+ r, w = [@wake], []
+ next_tick = nil
+ @lock.synchronize do
+ @selectable.each do |s|
+ r << s if s.can_read?
+ w << s if s.can_write?
+ next_tick = earliest(s.next_tick, next_tick)
+ end
+ end
+ next_tick = earliest(@schedule.next_tick, next_tick)
+
+ # Do the select and queue up all resulting work
+ now = Time.now
+ timeout = next_tick - now if next_tick
+ r, w = (timeout.nil? || timeout > 0) && IO.select(r, w, nil, timeout)
+ @wake.reset
+ selected = Set.new
+ @lock.synchronize do
+ if @stopped
+ @selectable.each { |s| s.close @stop_err; @work << s }
+ @wake.close
+ return
+ end
+ # Check if schedule has items due and is not already working
+ if !@schedule_working && before_eq(@schedule.next_tick, now)
+ @work << :schedule
+ @schedule_working = true
+ end
+ # Eliminate duplicates between r, w and next_tick due.
+ selected.merge(r) if r
+ selected.delete(@wake)
+ selected.merge(w) if w
+ @selectable -= selected
+ selected.merge(@selectable.select { |s| before_eq(s.next_tick, now) })
+ @selectable -= selected
+ end
+ selected.each { |s| @work << s } # Queue up tasks needing #process
+ @work << :select
end
# Rescue any exception raised by the block and stop the container.
@@ -386,6 +452,7 @@ module Qpid::Proton
yield
rescue Exception => e
stop(nil, e)
+ nil
end
end
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/02f49551/proton-c/bindings/ruby/lib/qpid_proton.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/qpid_proton.rb b/proton-c/bindings/ruby/lib/qpid_proton.rb
index c52310b..21a15b9 100644
--- a/proton-c/bindings/ruby/lib/qpid_proton.rb
+++ b/proton-c/bindings/ruby/lib/qpid_proton.rb
@@ -44,6 +44,7 @@ require "core/exceptions"
require "util/deprecation"
require "util/version"
require "util/error_handler"
+require "util/schedule"
require "util/wrapper"
# Types
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/02f49551/proton-c/bindings/ruby/lib/util/schedule.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/util/schedule.rb b/proton-c/bindings/ruby/lib/util/schedule.rb
new file mode 100644
index 0000000..ef80c1b
--- /dev/null
+++ b/proton-c/bindings/ruby/lib/util/schedule.rb
@@ -0,0 +1,63 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+module Qpid::Proton
+
+ # @private
+ module TimeCompare
+ # t1 <= t2, where nil is treated as "distant future"
+ def before_eq(t1, t2) (t1 && t2) ? (t1 <= t2) : t1; end
+
+ # min(t1, t2) where nil is treated as "distant future"
+ def earliest(t1, t2) before_eq(t1, t2) ? t1 : t2; end
+ end
+
+ # @private
+ # A sorted, thread-safe list of scheduled Proc.
+ # Note: calls to #process are always serialized, but calls to #add may be concurrent.
+ class Schedule
+ include TimeCompare
+ Item = Struct.new(:time, :proc)
+
+ def initialize() @lock = Mutex.new; @items = []; end
+
+ def next_tick() @lock.synchronize { @items.empty? ? nil : @items.first.time } end
+
+ # @return true if the Schedule was previously empty
+ def add(time, &proc)
+ @lock.synchronize do
+ if at = (0...@items.size).bsearch { |i| @items[i].time > time }
+ @items.insert(at, Item.new(time, proc))
+ else
+ @items << Item.new(time, proc)
+ end
+ return @items.size == 1
+ end
+ end
+
+ # @return true if the Schedule became empty as a result of this call
+ def process(now)
+ due = []
+ empty = @lock.synchronize do
+ due << @items.shift while !@items.empty? && before_eq(@items.first.time, now)
+ @items.empty?
+ end
+ due.each { |i| i.proc.call() }
+ return empty && !due.empty?
+ end
+ end
+end
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/02f49551/proton-c/bindings/ruby/tests/test_container.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/tests/test_container.rb b/proton-c/bindings/ruby/tests/test_container.rb
index faa505a..c9f54cc 100644
--- a/proton-c/bindings/ruby/tests/test_container.rb
+++ b/proton-c/bindings/ruby/tests/test_container.rb
@@ -300,5 +300,43 @@ class ContainerTest < MiniTest::Test
assert_raises(Container::StoppedError) { cont.run }
assert_raises(Container::StoppedError) { cont.listen "" }
end
-end
+ # Make sure Container::Scheduler puts tasks in proper order.
+ def test_scheduler
+ a = []
+ s = Schedule.new
+
+ assert_equal true, s.add(Time.at 3) { a << 3 }
+ assert_equal false, s.process(Time.at 2) # Should not run
+ assert_equal [], a
+ assert_equal true, s.process(Time.at 3) # Should run
+ assert_equal [3], a
+
+ a = []
+ assert_equal true, s.add(Time.at 3) { a << 3 }
+ assert_equal false, s.add(Time.at 5) { a << 5 }
+ assert_equal false, s.add(Time.at 1) { a << 1 }
+ assert_equal false, s.add(Time.at 4) { a << 4 }
+ assert_equal false, s.add(Time.at 4) { a << 4.1 }
+ assert_equal false, s.add(Time.at 4) { a << 4.2 }
+ assert_equal false, s.process(Time.at 4)
+ assert_equal [1, 3, 4, 4.1, 4.2], a
+ a = []
+ assert_equal true, s.process(Time.at 5)
+ assert_equal [5], a
+ end
+
+ def test_container_schedule
+ c = Container.new __method__
+ delays = [0.1, 0.03, 0.02, 0.04]
+ a = []
+ delays.each { |d| c.schedule(d) { a << [d, Time.now] } }
+ start = Time.now
+ c.run
+ delays.sort.each do |d|
+ x = a.shift
+ assert_equal d, x[0]
+ assert_in_delta start + d, x[1], 0.01, "#{d}"
+ end
+ end
+end
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[7/8] qpid-proton git commit: NO-JIRA: [ruby] Put external RUBYLIB at
end of path for tests
Posted by ac...@apache.org.
NO-JIRA: [ruby] Put external RUBYLIB at end of path for tests
Avoid picking up installed proton packages when testing.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/d37c32c1
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/d37c32c1
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/d37c32c1
Branch: refs/heads/master
Commit: d37c32c1d631a36574f07800e8f4443e91f1a9d5
Parents: 1f682a6
Author: Alan Conway <ac...@redhat.com>
Authored: Wed Mar 21 14:17:15 2018 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Fri Mar 23 09:39:39 2018 -0400
----------------------------------------------------------------------
proton-c/bindings/ruby/CMakeLists.txt | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d37c32c1/proton-c/bindings/ruby/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/CMakeLists.txt b/proton-c/bindings/ruby/CMakeLists.txt
index d8f9a44..f6f95f7 100644
--- a/proton-c/bindings/ruby/CMakeLists.txt
+++ b/proton-c/bindings/ruby/CMakeLists.txt
@@ -105,8 +105,8 @@ install(DIRECTORY lib/ DESTINATION ${RUBY_ARCHLIB_DIR} COMPONENT Ruby)
## Tests
-to_native_path("$ENV{RUBYLIB};${src}/lib;${src}/tests;${src}/spec;${bin};${c_lib_dir}" RUBYLIB)
-to_native_path("$ENV{PATH};${bin};${c_lib_dir}" PATH)
+to_native_path("${src}/lib;${src}/tests;${src}/spec;${bin};${c_lib_dir};$ENV{RUBYLIB}" RUBYLIB)
+to_native_path("${bin};${c_lib_dir};$ENV{PATH}" PATH)
execute_process(COMMAND ${RUBY_EXECUTABLE} -r minitest -e ""
RESULT_VARIABLE result OUTPUT_QUIET ERROR_QUIET)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[8/8] qpid-proton git commit: PROTON-1778: [ruby] thread safe
work_queue
Posted by ac...@apache.org.
PROTON-1778: [ruby] thread safe work_queue
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/aa8d3727
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/aa8d3727
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/aa8d3727
Branch: refs/heads/master
Commit: aa8d372723ec3fe8de0d16c165ed5944b65c40f0
Parents: 02f4955
Author: Alan Conway <ac...@redhat.com>
Authored: Thu Mar 22 17:19:45 2018 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Fri Mar 23 11:08:27 2018 -0400
----------------------------------------------------------------------
proton-c/bindings/ruby/lib/core/connection.rb | 4 ++
proton-c/bindings/ruby/lib/core/container.rb | 33 ++++++++----
proton-c/bindings/ruby/lib/core/endpoint.rb | 3 ++
proton-c/bindings/ruby/lib/core/work_queue.rb | 59 +++++++++++++++++++++
proton-c/bindings/ruby/lib/util/schedule.rb | 24 +++++++--
proton-c/bindings/ruby/tests/test_container.rb | 27 +++++++++-
6 files changed, 135 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/aa8d3727/proton-c/bindings/ruby/lib/core/connection.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/connection.rb b/proton-c/bindings/ruby/lib/core/connection.rb
index 327be10..c0d161e 100644
--- a/proton-c/bindings/ruby/lib/core/connection.rb
+++ b/proton-c/bindings/ruby/lib/core/connection.rb
@@ -288,6 +288,10 @@ module Qpid::Proton
@link_prefix + "/" + (@link_count += 1).to_s(32)
end
+ # @return [WorkQueue] work queue for code that should be run in the thread
+ # context for this connection
+ attr_reader :work_queue
+
protected
def _local_condition
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/aa8d3727/proton-c/bindings/ruby/lib/core/container.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/container.rb b/proton-c/bindings/ruby/lib/core/container.rb
index c581d34..78f8013 100644
--- a/proton-c/bindings/ruby/lib/core/container.rb
+++ b/proton-c/bindings/ruby/lib/core/container.rb
@@ -19,6 +19,7 @@
require 'thread'
require 'set'
require_relative 'listener'
+require_relative 'work_queue'
module Qpid::Proton
public
@@ -193,7 +194,7 @@ module Qpid::Proton
raise StoppedError if @stopped
end
while task = @work.pop
- run_one task
+ run_one(task, Time.now)
end
raise @panic if @panic
ensure
@@ -242,23 +243,36 @@ module Qpid::Proton
# Schedule code to be executed after a delay.
# @param delay [Numeric] delay in seconds, must be >= 0
# @yield [ ] the block is invoked with no parameters in a {#run} thread after +delay+ has elapsed
- def schedule(delay, &block)
- delay >= 0.0 or raise ArgumentError, "delay=#{delay} must be >= 0"
- block_given? or raise ArgumentError, "no block given"
+ # @return [void]
+ # @raise [ThreadError] if +non_block+ is true and the operation would block
+ def schedule(delay, non_block=false, &block)
not_stopped
- @lock.synchronize { @active += 1 } if @schedule.add(Time.now + delay, &block)
+ @lock.synchronize { @active += 1 } if @schedule.add(Time.now + delay, non_block, &block)
@wake.wake
end
private
+ def wake() @wake.wake; end
+
# Container driver applies options and adds container context to events
class ConnectionTask < Qpid::Proton::HandlerDriver
+ include TimeCompare
+
def initialize container, io, opts, server=false
super io, opts[:handler]
transport.set_server if server
transport.apply opts
connection.apply opts
+ @work_queue = WorkQueue.new container
+ connection.instance_variable_set(:@work_queue, @work_queue)
+ end
+ def next_tick() earliest(super, @work_queue.send(:next_tick)); end
+ def process(now) @work_queue.send(:process, now); super(); end
+
+ def dispatch # Intercept dispatch to close work_queue
+ super
+ @work_queue.send(:close) if read_closed? && write_closed?
end
end
@@ -341,7 +355,7 @@ module Qpid::Proton
end
# Handle a single item from the @work queue, this is the heart of the #run loop.
- def run_one(task)
+ def run_one(task, now)
case task
when :start
@@ -359,10 +373,9 @@ module Qpid::Proton
end
end
- now = Time.now
timeout = ((next_tick > now) ? next_tick - now : 0) if next_tick
r, w = IO.select(r, w, nil, timeout)
- now = Time.now
+ now = Time.now unless timeout == 0
@wake.reset if r && r.delete(@wake)
# selected is a Set to eliminate duplicates between r, w and next_tick due.
@@ -387,7 +400,7 @@ module Qpid::Proton
@work << :select # Enable next select
when ConnectionTask then
- maybe_panic { task.process }
+ maybe_panic { task.process now }
rearm task
when ListenTask then
@@ -396,7 +409,7 @@ module Qpid::Proton
rearm task
when :schedule then
- if maybe_panic { @schedule.process Time.now }
+ if maybe_panic { @schedule.process now }
@lock.synchronize { @active -= 1; check_stop_lh }
else
@lock.synchronize { @schedule_working = false }
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/aa8d3727/proton-c/bindings/ruby/lib/core/endpoint.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/endpoint.rb b/proton-c/bindings/ruby/lib/core/endpoint.rb
index 86d8efe..77372a0 100644
--- a/proton-c/bindings/ruby/lib/core/endpoint.rb
+++ b/proton-c/bindings/ruby/lib/core/endpoint.rb
@@ -67,6 +67,9 @@ module Qpid::Proton
self.connection.transport
end
+ # @return [WorkQueue] the work queue for work on this endpoint.
+ def work_queue() connection.work_queue; end
+
# @private
# @return [Bool] true if {#state} has all the bits of `mask` set
def check_state(mask) (self.state & mask) == mask; end
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/aa8d3727/proton-c/bindings/ruby/lib/core/work_queue.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/work_queue.rb b/proton-c/bindings/ruby/lib/core/work_queue.rb
new file mode 100644
index 0000000..d08d883
--- /dev/null
+++ b/proton-c/bindings/ruby/lib/core/work_queue.rb
@@ -0,0 +1,59 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+module Qpid::Proton
+
+ # A queue of work items to be executed, possibly in a different thread.
+ class WorkQueue
+
+ # Add code to be executed by the WorkQueue immediately.
+ # @param non_block [Boolean] if true raise {ThreadError} if the operation would block.
+ # @yield [ ] the block will be invoked with no parameters in the {WorkQueue} context,
+ # which may be a different thread.
+ # @return [void]
+ # @raise [ThreadError] if +non_block+ is true and the operation would block
+ # @raise [EOFError] if the queue is closed and cannot accept more work
+ def add(non_block=false, &block)
+ @schedule.add(Time.at(0), non_block, &block)
+ @container.send :wake
+ end
+
+ # Schedule work to be executed by the WorkQueue after a delay.
+ # Note that tasks scheduled after the WorkQueue closes will be silently dropped
+ #
+ # @param delay delay in seconds until the block is added to the queue.
+ # @param (see #add)
+ # @yield (see #add)
+ # @return [void]
+ # @raise (see #add)
+ def schedule(delay, non_block=false, &block)
+ @schedule.add(Time.now + delay, non_block, &block)
+ @container.send :wake
+ end
+
+ private
+
+ def initialize(container)
+ @schedule = Schedule.new
+ @container = container
+ end
+
+ def close() @schedule.close; end
+ def process(now) @schedule.process(now); end
+ def next_tick() @schedule.next_tick; end
+ end
+end
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/aa8d3727/proton-c/bindings/ruby/lib/util/schedule.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/util/schedule.rb b/proton-c/bindings/ruby/lib/util/schedule.rb
index ef80c1b..f1bfb36 100644
--- a/proton-c/bindings/ruby/lib/util/schedule.rb
+++ b/proton-c/bindings/ruby/lib/util/schedule.rb
@@ -33,13 +33,23 @@ module Qpid::Proton
include TimeCompare
Item = Struct.new(:time, :proc)
- def initialize() @lock = Mutex.new; @items = []; end
+ def initialize()
+ @lock = Mutex.new
+ @items = []
+ @closed = false
+ end
- def next_tick() @lock.synchronize { @items.empty? ? nil : @items.first.time } end
+ def next_tick()
+ @lock.synchronize { @items.first.time unless @items.empty? }
+ end
# @return true if the Schedule was previously empty
- def add(time, &proc)
+ # @raise EOFError if schedule is closed
+ # @raise ThreadError if +non_block+ and operation would block
+ def add(time, non_block=false, &proc)
+ # non_block ignored for now, but we may implement a bounded schedule in future.
@lock.synchronize do
+ raise EOFError if @closed
if at = (0...@items.size).bsearch { |i| @items[i].time > time }
@items.insert(at, Item.new(time, proc))
else
@@ -53,11 +63,17 @@ module Qpid::Proton
def process(now)
due = []
empty = @lock.synchronize do
- due << @items.shift while !@items.empty? && before_eq(@items.first.time, now)
+ due << @items.shift while (!@items.empty? && before_eq(@items.first.time, now))
@items.empty?
end
due.each { |i| i.proc.call() }
return empty && !due.empty?
end
+
+ # #add raises EOFError after #close.
+ # #process can still be called to drain the schedule.
+ def close()
+ @lock.synchronize { @closed = true }
+ end
end
end
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/aa8d3727/proton-c/bindings/ruby/tests/test_container.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/tests/test_container.rb b/proton-c/bindings/ruby/tests/test_container.rb
index c9f54cc..7f89205 100644
--- a/proton-c/bindings/ruby/tests/test_container.rb
+++ b/proton-c/bindings/ruby/tests/test_container.rb
@@ -336,7 +336,32 @@ class ContainerTest < MiniTest::Test
delays.sort.each do |d|
x = a.shift
assert_equal d, x[0]
- assert_in_delta start + d, x[1], 0.01, "#{d}"
+ assert_in_delta start + d, x[1], 0.01
end
end
+
+ def test_work_queue
+ cont = ServerContainer.new(__method__, {}, 1)
+ c = cont.connect(cont.url)
+ t = Thread.new { cont.run }
+ q = Queue.new
+
+ start = Time.now
+ c.work_queue.schedule(0.02) { q << [3, Thread.current] }
+ c.work_queue.add { q << [1, Thread.current] }
+ c.work_queue.schedule(0.04) { q << [4, Thread.current] }
+ c.work_queue.add { q << [2, Thread.current] }
+
+ assert_equal [1, t], q.pop
+ assert_equal [2, t], q.pop
+ assert_in_delta 0.0, Time.now - start, 0.01
+ assert_equal [3, t], q.pop
+ assert_in_delta 0.02, Time.now - start, 0.01
+ assert_equal [4, t], q.pop
+ assert_in_delta 0.04, Time.now - start, 0.01
+
+ c.work_queue.add { c.close }
+ t.join
+ assert_raises(EOFError) { c.work_queue.add { } }
+ end
end
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[6/8] qpid-proton git commit: NO-JIRA: [ruby] Re-organize Container
methods public first.
Posted by ac...@apache.org.
NO-JIRA: [ruby] Re-organize Container methods public first.
Remove protected section, no longer needed
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/d65528c0
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/d65528c0
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/d65528c0
Branch: refs/heads/master
Commit: d65528c01cc3d3a82527fca4dd150cb10feaf220
Parents: d37c32c
Author: Alan Conway <ac...@redhat.com>
Authored: Thu Mar 22 09:15:24 2018 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Fri Mar 23 09:39:39 2018 -0400
----------------------------------------------------------------------
proton-c/bindings/ruby/lib/core/container.rb | 316 +++++++++++-----------
1 file changed, 156 insertions(+), 160 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d65528c0/proton-c/bindings/ruby/lib/core/container.rb
----------------------------------------------------------------------
diff --git a/proton-c/bindings/ruby/lib/core/container.rb b/proton-c/bindings/ruby/lib/core/container.rb
index f8ff032..2d920b4 100644
--- a/proton-c/bindings/ruby/lib/core/container.rb
+++ b/proton-c/bindings/ruby/lib/core/container.rb
@@ -29,163 +29,6 @@ module Qpid::Proton
# One or more threads can call {#run}, events generated by all the listeners and
# connections will be dispatched in the {#run} threads.
class Container
- private
-
- # Container driver applies options and adds container context to events
- class ConnectionTask < Qpid::Proton::HandlerDriver
- def initialize container, io, opts, server=false
- super io, opts[:handler]
- transport.set_server if server
- transport.apply opts
- connection.apply opts
- end
- end
-
- class ListenTask < Listener
-
- def initialize(io, handler, container)
- super
- @closing = @closed = nil
- env = ENV['PN_TRACE_EVT']
- if env && ["true", "1", "yes", "on"].include?(env.downcase)
- @log_prefix = "[0x#{object_id.to_s(16)}](PN_LISTENER_"
- else
- @log_prefix = nil
- end
- dispatch(:on_open);
- end
-
- def process
- return if @closed
- unless @closing
- begin
- return @io.accept, dispatch(:on_accept)
- rescue IO::WaitReadable, Errno::EINTR
- rescue IOError, SystemCallError => e
- close e
- end
- end
- ensure
- if @closing
- @io.close rescue nil
- @closed = true
- dispatch(:on_error, @condition) if @condition
- dispatch(:on_close)
- end
- end
-
- def can_read?() !finished?; end
- def can_write?() false; end
- def finished?() @closed; end
-
- def dispatch(method, *args)
- # TODO aconway 2017-11-27: better logging
- STDERR.puts "#{@log_prefix}#{([method[3..-1].upcase]+args).join ', '})" if @log_prefix
- @handler.__send__(method, self, *args) if @handler && @handler.respond_to?(method)
- end
- end
-
- # Selectable object that can be used to wake IO.select from another thread
- class SelectWaker
- def initialize
- @rd, @wr = IO.pipe
- @lock = Mutex.new
- @set = false
- end
-
- def to_io() @rd; end
-
- def wake
- @lock.synchronize do
- return if @set # Don't write if already has data
- @set = true
- begin @wr.write_nonblock('x') rescue IO::WaitWritable end
- end
- end
-
- def reset
- @lock.synchronize do
- return unless @set
- begin @rd.read_nonblock(1) rescue IO::WaitReadable end
- @set = false
- end
- end
-
- def close
- @rd.close
- @wr.close
- end
- end
-
- def next_tick_due(x, now)
- nt = x.respond_to?(:next_tick) && x.next_tick
- nt && (nt <= now)
- end
-
- def next_tick_min(x, t)
- nt = x.respond_to?(:next_tick) && x.next_tick
- nt if !t || (nt < t)
- end
-
- # Rescue any exception raised by the block and stop the container.
- def maybe_panic
- begin
- yield
- rescue Exception => e
- stop(nil, e)
- end
- end
-
- # Handle a single item from the @work queue, this is the heart of the #run loop.
- def run_one(task)
- case task
-
- when :start
- @adapter.on_container_start(self) if @adapter.respond_to? :on_container_start
-
- when Container
- r, w = [@wake], []
- next_tick = nil
- @lock.synchronize do
- @selectable.each do |s|
- r << s if s.send :can_read?
- w << s if s.send :can_write?
- next_tick = next_tick_min(s, next_tick)
- end
- end
- now = Time.now
- timeout = ((next_tick > now) ? next_tick - now : 0) if next_tick
- r, w = IO.select(r, w, nil, timeout)
- now = Time.now
- selected = Set.new(r).delete(@wake)
- selected.merge(w) if w
- selected.merge(@selectable.select { |s| next_tick_due(s, now) })
- @wake.reset
- stop_select = nil
- @lock.synchronize do
- if stop_select = @stopped # close everything
- selected += @selectable
- selected.each { |s| s.close @stop_err }
- @wake.close
- end
- @selectable -= selected # Remove selected tasks
- end
- selected.each { |s| @work << s } # Queue up tasks needing #process
- @work << self unless stop_select
-
- when ConnectionTask then
- maybe_panic { task.process }
- rearm task
-
- when ListenTask then
- io, opts = maybe_panic { task.process }
- add(connection_driver(io, opts, true)) if io
- rearm task
- end
- end
-
- public
-
# Error raised if the container is used after {#stop} has been called.
class StoppedError < RuntimeError
def initialize(*args) super("container has been stopped"); end
@@ -261,7 +104,7 @@ module Qpid::Proton
# Number of threads in {#run}
# @return [Bool] {#run} thread count
- def running; @lock.synchronize { @running }; end
+ def running() @lock.synchronize { @running }; end
# Open an AMQP connection.
#
@@ -391,7 +234,160 @@ module Qpid::Proton
@wake.wake
end
- protected
+ private
+
+ # Container driver applies options and adds container context to events
+ class ConnectionTask < Qpid::Proton::HandlerDriver
+ def initialize container, io, opts, server=false
+ super io, opts[:handler]
+ transport.set_server if server
+ transport.apply opts
+ connection.apply opts
+ end
+ end
+
+ class ListenTask < Listener
+
+ def initialize(io, handler, container)
+ super
+ @closing = @closed = nil
+ env = ENV['PN_TRACE_EVT']
+ if env && ["true", "1", "yes", "on"].include?(env.downcase)
+ @log_prefix = "[0x#{object_id.to_s(16)}](PN_LISTENER_"
+ else
+ @log_prefix = nil
+ end
+ dispatch(:on_open);
+ end
+
+ def process
+ return if @closed
+ unless @closing
+ begin
+ return @io.accept, dispatch(:on_accept)
+ rescue IO::WaitReadable, Errno::EINTR
+ rescue IOError, SystemCallError => e
+ close e
+ end
+ end
+ ensure
+ if @closing
+ @io.close rescue nil
+ @closed = true
+ dispatch(:on_error, @condition) if @condition
+ dispatch(:on_close)
+ end
+ end
+
+ def can_read?() !finished?; end
+ def can_write?() false; end
+ def finished?() @closed; end
+
+ def dispatch(method, *args)
+ # TODO aconway 2017-11-27: better logging
+ STDERR.puts "#{@log_prefix}#{([method[3..-1].upcase]+args).join ', '})" if @log_prefix
+ @handler.__send__(method, self, *args) if @handler && @handler.respond_to?(method)
+ end
+ end
+
+ # Selectable object that can be used to wake IO.select from another thread
+ class SelectWaker
+ def initialize
+ @rd, @wr = IO.pipe
+ @lock = Mutex.new
+ @set = false
+ end
+
+ def to_io() @rd; end
+
+ def wake
+ @lock.synchronize do
+ return if @set # Don't write if already has data
+ @set = true
+ begin @wr.write_nonblock('x') rescue IO::WaitWritable end
+ end
+ end
+
+ def reset
+ @lock.synchronize do
+ return unless @set
+ begin @rd.read_nonblock(1) rescue IO::WaitReadable end
+ @set = false
+ end
+ end
+
+ def close
+ @rd.close
+ @wr.close
+ end
+ end
+
+ # Handle a single item from the @work queue, this is the heart of the #run loop.
+ def run_one(task)
+ case task
+
+ when :start
+ @adapter.on_container_start(self) if @adapter.respond_to? :on_container_start
+
+ when Container
+ r, w = [@wake], []
+ next_tick = nil
+ @lock.synchronize do
+ @selectable.each do |s|
+ r << s if s.send :can_read?
+ w << s if s.send :can_write?
+ next_tick = next_tick_min(s, next_tick)
+ end
+ end
+ now = Time.now
+ timeout = ((next_tick > now) ? next_tick - now : 0) if next_tick
+ r, w = IO.select(r, w, nil, timeout)
+ now = Time.now
+ selected = Set.new(r).delete(@wake)
+ selected.merge(w) if w
+ selected.merge(@selectable.select { |s| next_tick_due(s, now) })
+ @wake.reset
+ stop_select = nil
+ @lock.synchronize do
+ if stop_select = @stopped # close everything
+ selected += @selectable
+ selected.each { |s| s.close @stop_err }
+ @wake.close
+ end
+ @selectable -= selected # Remove selected tasks
+ end
+ selected.each { |s| @work << s } # Queue up tasks needing #process
+ @work << self unless stop_select
+
+ when ConnectionTask then
+ maybe_panic { task.process }
+ rearm task
+
+ when ListenTask then
+ io, opts = maybe_panic { task.process }
+ add(connection_driver(io, opts, true)) if io
+ rearm task
+ end
+ end
+
+ def next_tick_due(x, now)
+ nt = x.respond_to?(:next_tick) && x.next_tick
+ nt && (nt <= now)
+ end
+
+ def next_tick_min(x, t)
+ nt = x.respond_to?(:next_tick) && x.next_tick
+ nt if !t || (nt < t)
+ end
+
+ # Rescue any exception raised by the block and stop the container.
+ def maybe_panic
+ begin
+ yield
+ rescue Exception => e
+ stop(nil, e)
+ end
+ end
# Normally if we add work we need to set a wakeup to ensure a single #run
# thread doesn't get stuck in select while there is other work on the queue.
@@ -439,7 +435,7 @@ module Qpid::Proton
end
end
- def not_stopped; raise StoppedError if @lock.synchronize { @stopped }; end
+ def not_stopped() raise StoppedError if @lock.synchronize { @stopped }; end
end
end
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org