You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by mc...@apache.org on 2014/12/18 14:17:14 UTC

[2/4] qpid-proton git commit: PROTON-471: Example for Messenger->Work in Perl.

PROTON-471: Example for Messenger->Work in Perl.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/9186cb6b
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/9186cb6b
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/9186cb6b

Branch: refs/heads/master
Commit: 9186cb6bd3f3b7a1b17d9fe9c1fd28eb83073322
Parents: 3b20007
Author: Darryl L. Pierce <mc...@gmail.com>
Authored: Fri Dec 12 17:15:58 2014 -0500
Committer: Darryl L. Pierce <mc...@gmail.com>
Committed: Thu Dec 18 08:16:45 2014 -0500

----------------------------------------------------------------------
 examples/messenger/perl/async.pm      | 120 +++++++++++++++++++++++++++++
 examples/messenger/perl/recv_async.pl |  84 ++++++++++++++++++++
 examples/messenger/perl/send_async.pl |  97 +++++++++++++++++++++++
 3 files changed, 301 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9186cb6b/examples/messenger/perl/async.pm
----------------------------------------------------------------------
diff --git a/examples/messenger/perl/async.pm b/examples/messenger/perl/async.pm
new file mode 100644
index 0000000..5cd350b
--- /dev/null
+++ b/examples/messenger/perl/async.pm
@@ -0,0 +1,120 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+use qpid_proton;
+
+package async::CallbackAdapter;
+
+sub new {
+    my ($class) = @_;
+    my ($self) = {};
+
+    my $messenger = $_[1];
+
+    $self->{_messenger} = $messenger;
+    $messenger->set_blocking(0);
+    $messenger->set_incoming_window(1024);
+    $messenger->set_outgoing_window(1024);
+
+    my $message = qpid::proton::Message->new();
+    $self->{_message} = $message;
+    $self->{_incoming} = $message;
+    $self->{_tracked} = {};
+
+    bless $self, $class;
+    return $self;
+}
+
+sub run {
+    my ($self) = @_;
+
+    $self->{_running} = 1;
+
+    my $messenger = $self->{_messenger};
+
+    $messenger->start();
+    $self->on_start();
+
+    do {
+        $messenger->work;
+        $self->process_outgoing;
+        $self->process_incoming;
+    } while($self->{_running});
+
+    $messenger->stop();
+
+    while(!$messenger->stopped()) {
+        $messenger->work;
+        $self->process_outgoing;
+        $self->process_incoming;
+    }
+
+    $self->on_stop();
+}
+
+sub stop {
+    my ($self) = @_;
+
+    $self->{_running} = 0;
+}
+
+sub process_outgoing {
+    my ($self) = @_;
+    my $tracked = $self->{_tracked};
+
+    foreach $key (keys %{$tracked}) {
+        my $on_status = $tracked->{$key};
+        if (defined($on_status)) {
+            if (!($on_status eq qpid::proton::Tracker::PENDING)) {
+                $self->$on_status($status);
+                $self->{_messenger}->settle($t);
+                # delete the settled item
+                undef $tracked->{$key};
+            }
+        }
+    }
+}
+
+sub process_incoming {
+    my ($self) = @_;
+    my $messenger = $self->{_messenger};
+
+    while ($messenger->incoming > 0) {
+        my $message = $self->{_message};
+        my $t = $messenger->get($message);
+
+        $self->on_receive($message);
+        $messenger->accept($t);
+    }
+}
+
+sub send {
+    my ($self) = @_;
+    my $messenger = $self->{_messenger};
+    my $tracked = $self->{_tracked};
+    my $message = $_[1];
+    my $on_status = $_[2] || undef;
+
+    my $tracker = $messenger->put($message);
+
+    $tracked->{$tracker} = $on_status if (defined($on_status));
+}
+
+
+1;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9186cb6b/examples/messenger/perl/recv_async.pl
----------------------------------------------------------------------
diff --git a/examples/messenger/perl/recv_async.pl b/examples/messenger/perl/recv_async.pl
new file mode 100755
index 0000000..9a2195a
--- /dev/null
+++ b/examples/messenger/perl/recv_async.pl
@@ -0,0 +1,84 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+use qpid_proton;
+use async;
+
+package async::Receiver;
+
+@ISA = (async::CallbackAdapter);
+
+sub on_start {
+    my ($self) = @_;
+    my $args = $_[1] || ("amqp://~0.0.0.0");
+    my $messenger = $self->{_messenger};
+
+    foreach $arg ($args) {
+        $messenger->subscribe($arg);
+    }
+
+    $messenger->receive();
+}
+
+sub on_receive {
+    my ($self) = @_;
+    my $msg = $_[1];
+    my $message = $self->{_message};
+    my $text = "";
+
+    if (defined($msg->get_body)) {
+        $text = $msg->get_body;
+        if ($text eq "die") {
+            $self->stop;
+        }
+    } else {
+        $text = $message->get_subject;
+    }
+
+    $text = "" if (!defined($text));
+
+    print "Received: $text\n";
+
+    if ($msg->get_reply_to) {
+        print "Sending reply to: " . $msg->get_reply_to . "\n";
+        $message->clear;
+        $message->set_address($msg->get_reply_to());
+        $message->set_body("Reply for ", $msg->get_body);
+        $self->send($message);
+    }
+}
+
+sub on_status {
+    my ($self) = @_;
+    my $messenger = $self->{_messenger};
+    my $status = $_[1];
+
+    print "Status: ", $status, "\n";
+}
+
+sub on_stop {
+    print "Stopped.\n"
+}
+
+package main;
+
+our $messenger = new qpid::proton::Messenger();
+our $app = new async::Receiver($messenger);
+
+$app->run();

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9186cb6b/examples/messenger/perl/send_async.pl
----------------------------------------------------------------------
diff --git a/examples/messenger/perl/send_async.pl b/examples/messenger/perl/send_async.pl
new file mode 100644
index 0000000..2f9408a
--- /dev/null
+++ b/examples/messenger/perl/send_async.pl
@@ -0,0 +1,97 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+use Getopt::Std;
+use qpid_proton;
+use async;
+
+$Getopt::Std::STANDARD_HELP_VERSION = 1;
+
+sub VERSION_MESSAGE() {}
+
+sub HELP_MESSAGE() {
+    print "Usage: send_async.pl [OPTIONS] <msg_0> <msg_1> ...\n";
+    print "Options:\n";
+    print "\t-a     - the message address (def. amqp://0.0.0.0)\n";
+    print "\t-r     - the reply-to address: //<domain>[/<name>]\n";
+    print "\t msg_# - a text string to send\n";
+}
+
+my %optons = ();
+getopts("a:r:", \%options) or usage();
+
+our $address = $options{a} || "amqp://0.0.0.0";
+our $replyto = $options{r} || "~/#";
+
+package async::Sender;
+
+@ISA = (async::CallbackAdapter);
+
+sub on_start {
+    my ($self) = @_;
+    my $message = $self->{_message};
+    my $messenger = $self->{_messenger};
+    my $args = $_[1] || ("Hello world!");
+
+    print "Started\n";
+
+    $message->clear;
+    $message->set_address("amqp://0.0.0.0");
+    $message->set_reply_to($replyto) if (defined($replyto));
+
+    foreach $arg ($args) {
+        $message->set_body($arg);
+        if ($replyto) {
+            $message->set_reply_to($replyto);
+        }
+        $self->send($message, "on_status");
+    }
+
+    $messenger->receive() if (defined($replyto));
+}
+
+sub on_status {
+    my ($self) = @_;
+    my $messenger = $self->{_messenger};
+    my $status = $_[1] || "";
+
+    print "Status: ", $status, "\n";
+}
+
+sub on_receive {
+    my ($self) = @_;
+    my $message = $_[1];
+    my $text = $message->get_body || "[empty]";
+
+    print "Received: " . $text . "\n";
+
+    $self->stop();
+}
+
+sub on_stop {
+    print "Stopped\n";
+}
+
+
+package main;
+
+our $msgr = new qpid::proton::Messenger();
+our $app = async::Sender->new($msgr);
+
+$app->run;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org