You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by mc...@apache.org on 2014/12/18 14:17:14 UTC
[2/4] qpid-proton git commit: PROTON-471: Example for Messenger->Work
in Perl.
PROTON-471: Example for Messenger->Work in Perl.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/9186cb6b
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/9186cb6b
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/9186cb6b
Branch: refs/heads/master
Commit: 9186cb6bd3f3b7a1b17d9fe9c1fd28eb83073322
Parents: 3b20007
Author: Darryl L. Pierce <mc...@gmail.com>
Authored: Fri Dec 12 17:15:58 2014 -0500
Committer: Darryl L. Pierce <mc...@gmail.com>
Committed: Thu Dec 18 08:16:45 2014 -0500
----------------------------------------------------------------------
examples/messenger/perl/async.pm | 120 +++++++++++++++++++++++++++++
examples/messenger/perl/recv_async.pl | 84 ++++++++++++++++++++
examples/messenger/perl/send_async.pl | 97 +++++++++++++++++++++++
3 files changed, 301 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9186cb6b/examples/messenger/perl/async.pm
----------------------------------------------------------------------
diff --git a/examples/messenger/perl/async.pm b/examples/messenger/perl/async.pm
new file mode 100644
index 0000000..5cd350b
--- /dev/null
+++ b/examples/messenger/perl/async.pm
@@ -0,0 +1,120 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+use qpid_proton;
+
+package async::CallbackAdapter;
+
+sub new {
+ my ($class) = @_;
+ my ($self) = {};
+
+ my $messenger = $_[1];
+
+ $self->{_messenger} = $messenger;
+ $messenger->set_blocking(0);
+ $messenger->set_incoming_window(1024);
+ $messenger->set_outgoing_window(1024);
+
+ my $message = qpid::proton::Message->new();
+ $self->{_message} = $message;
+ $self->{_incoming} = $message;
+ $self->{_tracked} = {};
+
+ bless $self, $class;
+ return $self;
+}
+
+sub run {
+ my ($self) = @_;
+
+ $self->{_running} = 1;
+
+ my $messenger = $self->{_messenger};
+
+ $messenger->start();
+ $self->on_start();
+
+ do {
+ $messenger->work;
+ $self->process_outgoing;
+ $self->process_incoming;
+ } while($self->{_running});
+
+ $messenger->stop();
+
+ while(!$messenger->stopped()) {
+ $messenger->work;
+ $self->process_outgoing;
+ $self->process_incoming;
+ }
+
+ $self->on_stop();
+}
+
+sub stop {
+ my ($self) = @_;
+
+ $self->{_running} = 0;
+}
+
+sub process_outgoing {
+ my ($self) = @_;
+ my $tracked = $self->{_tracked};
+
+ foreach $key (keys %{$tracked}) {
+ my $on_status = $tracked->{$key};
+ if (defined($on_status)) {
+ if (!($on_status eq qpid::proton::Tracker::PENDING)) {
+ $self->$on_status($status);
+ $self->{_messenger}->settle($t);
+ # delete the settled item
+ undef $tracked->{$key};
+ }
+ }
+ }
+}
+
+sub process_incoming {
+ my ($self) = @_;
+ my $messenger = $self->{_messenger};
+
+ while ($messenger->incoming > 0) {
+ my $message = $self->{_message};
+ my $t = $messenger->get($message);
+
+ $self->on_receive($message);
+ $messenger->accept($t);
+ }
+}
+
+sub send {
+ my ($self) = @_;
+ my $messenger = $self->{_messenger};
+ my $tracked = $self->{_tracked};
+ my $message = $_[1];
+ my $on_status = $_[2] || undef;
+
+ my $tracker = $messenger->put($message);
+
+ $tracked->{$tracker} = $on_status if (defined($on_status));
+}
+
+
+1;
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9186cb6b/examples/messenger/perl/recv_async.pl
----------------------------------------------------------------------
diff --git a/examples/messenger/perl/recv_async.pl b/examples/messenger/perl/recv_async.pl
new file mode 100755
index 0000000..9a2195a
--- /dev/null
+++ b/examples/messenger/perl/recv_async.pl
@@ -0,0 +1,84 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+use qpid_proton;
+use async;
+
+package async::Receiver;
+
+@ISA = (async::CallbackAdapter);
+
+sub on_start {
+ my ($self) = @_;
+ my $args = $_[1] || ("amqp://~0.0.0.0");
+ my $messenger = $self->{_messenger};
+
+ foreach $arg ($args) {
+ $messenger->subscribe($arg);
+ }
+
+ $messenger->receive();
+}
+
+sub on_receive {
+ my ($self) = @_;
+ my $msg = $_[1];
+ my $message = $self->{_message};
+ my $text = "";
+
+ if (defined($msg->get_body)) {
+ $text = $msg->get_body;
+ if ($text eq "die") {
+ $self->stop;
+ }
+ } else {
+ $text = $message->get_subject;
+ }
+
+ $text = "" if (!defined($text));
+
+ print "Received: $text\n";
+
+ if ($msg->get_reply_to) {
+ print "Sending reply to: " . $msg->get_reply_to . "\n";
+ $message->clear;
+ $message->set_address($msg->get_reply_to());
+ $message->set_body("Reply for ", $msg->get_body);
+ $self->send($message);
+ }
+}
+
+sub on_status {
+ my ($self) = @_;
+ my $messenger = $self->{_messenger};
+ my $status = $_[1];
+
+ print "Status: ", $status, "\n";
+}
+
+sub on_stop {
+ print "Stopped.\n"
+}
+
+package main;
+
+our $messenger = new qpid::proton::Messenger();
+our $app = new async::Receiver($messenger);
+
+$app->run();
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9186cb6b/examples/messenger/perl/send_async.pl
----------------------------------------------------------------------
diff --git a/examples/messenger/perl/send_async.pl b/examples/messenger/perl/send_async.pl
new file mode 100644
index 0000000..2f9408a
--- /dev/null
+++ b/examples/messenger/perl/send_async.pl
@@ -0,0 +1,97 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+use Getopt::Std;
+use qpid_proton;
+use async;
+
+$Getopt::Std::STANDARD_HELP_VERSION = 1;
+
+sub VERSION_MESSAGE() {}
+
+sub HELP_MESSAGE() {
+ print "Usage: send_async.pl [OPTIONS] <msg_0> <msg_1> ...\n";
+ print "Options:\n";
+ print "\t-a - the message address (def. amqp://0.0.0.0)\n";
+ print "\t-r - the reply-to address: //<domain>[/<name>]\n";
+ print "\t msg_# - a text string to send\n";
+}
+
+my %optons = ();
+getopts("a:r:", \%options) or usage();
+
+our $address = $options{a} || "amqp://0.0.0.0";
+our $replyto = $options{r} || "~/#";
+
+package async::Sender;
+
+@ISA = (async::CallbackAdapter);
+
+sub on_start {
+ my ($self) = @_;
+ my $message = $self->{_message};
+ my $messenger = $self->{_messenger};
+ my $args = $_[1] || ("Hello world!");
+
+ print "Started\n";
+
+ $message->clear;
+ $message->set_address("amqp://0.0.0.0");
+ $message->set_reply_to($replyto) if (defined($replyto));
+
+ foreach $arg ($args) {
+ $message->set_body($arg);
+ if ($replyto) {
+ $message->set_reply_to($replyto);
+ }
+ $self->send($message, "on_status");
+ }
+
+ $messenger->receive() if (defined($replyto));
+}
+
+sub on_status {
+ my ($self) = @_;
+ my $messenger = $self->{_messenger};
+ my $status = $_[1] || "";
+
+ print "Status: ", $status, "\n";
+}
+
+sub on_receive {
+ my ($self) = @_;
+ my $message = $_[1];
+ my $text = $message->get_body || "[empty]";
+
+ print "Received: " . $text . "\n";
+
+ $self->stop();
+}
+
+sub on_stop {
+ print "Stopped\n";
+}
+
+
+package main;
+
+our $msgr = new qpid::proton::Messenger();
+our $app = async::Sender->new($msgr);
+
+$app->run;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org