You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spamassassin.apache.org by jm...@apache.org on 2006/04/10 16:08:51 UTC
svn commit: r392955 -
/spamassassin/branches/bug-3109-shortcircuiting/lib/Mail/SpamAssassin/AsyncLoop.pm
Author: jm
Date: Mon Apr 10 07:08:50 2006
New Revision: 392955
URL: http://svn.apache.org/viewcvs?rev=392955&view=rev
Log:
bug 4860: consolidate DNSBL-lookup async infrastructure, allow early exit features for short-circuiting
Added:
spamassassin/branches/bug-3109-shortcircuiting/lib/Mail/SpamAssassin/AsyncLoop.pm
Added: spamassassin/branches/bug-3109-shortcircuiting/lib/Mail/SpamAssassin/AsyncLoop.pm
URL: http://svn.apache.org/viewcvs/spamassassin/branches/bug-3109-shortcircuiting/lib/Mail/SpamAssassin/AsyncLoop.pm?rev=392955&view=auto
==============================================================================
--- spamassassin/branches/bug-3109-shortcircuiting/lib/Mail/SpamAssassin/AsyncLoop.pm (added)
+++ spamassassin/branches/bug-3109-shortcircuiting/lib/Mail/SpamAssassin/AsyncLoop.pm Mon Apr 10 07:08:50 2006
@@ -0,0 +1,328 @@
+# <@LICENSE>
+# Copyright 2004 Apache Software Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# </...@LICENSE>
+
+=head1 NAME
+
+Mail::SpamAssassin::AsyncLoop - scanner asynchronous event loop
+
+=head1 DESCRIPTION
+
+An asynchronous event loop used for long-running operations, performed "in the
+background" during the Mail::SpamAssassin::check() scan operation, such as DNS
+blocklist lookups.
+
+=head1 METHODS
+
+=over 4
+
+=cut
+
+package Mail::SpamAssassin::AsyncLoop;
+
+use strict;
+use warnings;
+use bytes;
+
+use Mail::SpamAssassin;
+use Mail::SpamAssassin::Logger;
+
+our @ISA = qw();
+
+#############################################################################
+
+sub new {
+ my $class = shift;
+ $class = ref($class) || $class;
+
+ my ($main) = @_;
+ my $self = {
+ main => $main,
+ last_count => 0,
+ times_count_was_same => 0,
+ queries_started => 0,
+ queries_completed => 0,
+ pending_lookups => { }
+ };
+
+ bless ($self, $class);
+ $self;
+}
+
+# ---------------------------------------------------------------------------
+
+=item $obj = $async->start_lookup($obj)
+
+Register the start a long-running asynchronous lookup operation. C<$obj>
+is a hash reference containing the following items:
+
+=over 4
+
+=item key (required)
+
+A key string, unique to this lookup. This is what is reported in
+debug messages, used as the key for C<get_lookup()>, etc.
+
+=item id (required)
+
+An ID string, also unique to this lookup. Typically, this is the DNS packet ID
+as returned by DnsResolver's C<bgsend> method. Sadly, the Net::DNS
+architecture forces us to keep a separate ID string for this task instead of
+reusing C<key> -- if you are not using DNS lookups through DnsResolver, it
+should be OK to just reuse C<key>.
+
+=item type (required)
+
+A string, typically one word, used to describe the type of lookup in log
+messages, such as C<DNSBL>, C<MX>, C<TXT>.
+
+=item poll_callback (optional)
+
+A code reference, which will be called periodically during the
+background-processing period. If you will be performing an async lookup on a
+non-DNS-based service, you will need to implement this so that it checks for
+new responses and calls C<set_response_packet()> or C<report_id_complete()> as
+appropriate. DNS-based lookups can leave it undefined, since
+DnsResolver::poll_responses() will be called automatically anyway.
+
+The code reference will be called with one argument, the C<$ent> object.
+
+=item completed_callback (optional)
+
+A code reference, which will be called when the lookup has been reported as
+complete via C<set_response_packet()> or C<report_id_complete()>.
+
+The code reference will be called with one argument, the C<$ent> object.
+
+=back
+
+C<$obj> is returned by this method.
+
+=cut
+
+sub start_lookup {
+ my ($self, $ent) = @_;
+
+ die "oops, no id" unless $ent->{id};
+ die "oops, no key" unless $ent->{key};
+ die "oops, no type" unless $ent->{type};
+
+ $self->{queries_started}++;
+ $self->{pending_lookups}->{$ent->{key}} = $ent;
+ $ent;
+}
+
+# ---------------------------------------------------------------------------
+
+=item $obj = $async->get_lookup($key)
+
+Retrieve the pending-lookup object for the given key C<$key>.
+
+If the lookup is complete, this will return C<undef>.
+
+Note that a lookup is still considered "pending" until C<complete_lookups()> is
+called, even if it has been reported as complete via C<set_response_packet()>
+or C<report_id_complete()>.
+
+=cut
+
+sub get_lookup {
+ my ($self, $key) = @_;
+ return $self->{pending_lookups}->{$key};
+}
+
+# ---------------------------------------------------------------------------
+
+=item @objs = $async->get_pending_lookups()
+
+Retrieve the lookup objects for all pending lookups.
+
+Note that a lookup is still considered "pending" until C<complete_lookups()> is
+called, even if it has been reported as complete via C<set_response_packet()>
+or C<report_id_complete()>.
+
+=cut
+
+sub get_pending_lookups {
+ my ($self) = @_;
+ return values %{$self->{pending_lookups}};
+}
+
+# ---------------------------------------------------------------------------
+
+=item $alldone = $async->complete_lookups()
+
+Perform a poll of the pending lookups, to see if any are completed; if they
+are, their <completed_callback> is called with the entry object for that
+lookup.
+
+If there are no lookups remaining, or if too long has elapsed since any results
+were returned, C<1> is returned, otherwise C<0>.
+
+=cut
+
+sub complete_lookups {
+ my ($self, $timeout) = @_;
+ my %typecount = ();
+ my $stillwaiting = 0;
+
+ my $pending = $self->{pending_lookups};
+ if (scalar keys %{$pending} <= 0) {
+ return 1; # nothing left to do
+ }
+
+ $self->{queries_started} = 0;
+ $self->{queries_completed} = 0;
+
+ # trap this loop in an eval { } block, as Net::DNS could throw
+ # die()s our way; in particular, process_dnsbl_results() has
+ # thrown die()s before (bug 3794).
+ eval {
+
+ my $nfound = $self->{main}->{resolver}->poll_responses($timeout);
+ $nfound ||= 'no';
+ dbg ("async: select found $nfound socks ready");
+
+ foreach my $key (keys %{$pending}) {
+ my $ent = $pending->{$key};
+
+ # call a "poll_callback" sub, if one exists
+ if (defined $ent->{poll_callback}) {
+ $ent->{poll_callback}->($ent);
+ }
+
+ my $type = $ent->{type};
+ if (!exists ($self->{finished}->{$ent->{id}})) {
+ $typecount{$type}++;
+ next;
+ }
+
+ $ent->{response_packet} = delete $self->{finished}->{$ent->{id}};
+ if (defined $ent->{completed_callback}) {
+ $ent->{completed_callback}->($ent);
+ }
+
+ $self->{queries_completed}++;
+ delete $self->{pending_lookups}->{$key};
+ }
+
+ dbg("async: queries completed: ".$self->{queries_completed}.
+ " started: ".$self->{queries_started});
+
+ if (1) {
+ dbg("async: queries active: ".
+ join (' ', map { "$_=$typecount{$_}" } sort keys %typecount)." at ".
+ localtime(time));
+ }
+
+ # ensure we don't get stuck if a request gets lost in the ether.
+ if (!$stillwaiting) {
+ my $numkeys = scalar keys %{$self->{pending_lookups}};
+ if ($numkeys == 0) {
+ $stillwaiting = 0;
+
+ } else {
+ $stillwaiting = 1;
+
+ # avoid looping forever if we haven't got all results.
+ if ($self->{last_count} == $numkeys) {
+ $self->{times_count_was_same}++;
+ if ($self->{times_count_was_same} > 20)
+ {
+ dbg("async: escaping: must have lost requests");
+ $self->abort_remaining_lookups ($self);
+ $stillwaiting = 0;
+ }
+ } else {
+ $self->{last_count} = $numkeys;
+ $self->{times_count_was_same} = 0;
+ }
+ }
+ }
+
+ };
+
+ if ($@) {
+ dbg("async: caught complete_lookups death, aborting: $@");
+ $stillwaiting = 0; # abort remaining
+ }
+
+ return (!$stillwaiting);
+}
+
+# ---------------------------------------------------------------------------
+
+=item $async->abort_remaining_lookups()
+
+Abort any remaining lookups.
+
+=cut
+
+sub abort_remaining_lookups {
+ my ($self) = @_;
+
+ my $pending = $self->{pending_lookups};
+ my $foundone = 0;
+ foreach my $key (keys %{$pending})
+ {
+ if (!$foundone) {
+ dbg("async: aborting remaining lookups");
+ $foundone = 1;
+ }
+
+ delete $pending->{$key};
+ }
+ $self->{main}->{resolver}->bgabort();
+}
+
+# ---------------------------------------------------------------------------
+
+=item $async->set_response_packet($id, $pkt)
+
+Register a "response packet" for a given query. C<$id> is the ID for the
+query, and must match the C<id> supplied in C<start_lookup()>. C<$pkt> is the
+packet object for the response.
+
+If this was called, C<$pkt> will be available in the C<completed_callback>
+function as C<$ent-<gt>{response_packet}>.
+
+One or the other of C<set_response_packet()> or C<report_id_complete()>
+should be called, but not both.
+
+=cut
+
+sub set_response_packet {
+ my ($self, $id, $pkt) = @_;
+ $self->{finished}->{$id} = $pkt;
+}
+
+=item $async->report_id_complete($id)
+
+Register that a query has completed, and is no longer "pending". C<$id> is the
+ID for the query, and must match the C<id> supplied in C<start_lookup()>.
+
+One or the other of C<set_response_packet()> or C<report_id_complete()>
+should be called, but not both.
+
+=cut
+
+sub report_id_complete {
+ my ($self, $id) = @_;
+ $self->{finished}->{$id} = undef;
+}
+
+# ---------------------------------------------------------------------------
+
+1;