You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spamassassin.apache.org by mm...@apache.org on 2013/05/31 16:17:49 UTC
svn commit: r1488244 - in /spamassassin/trunk/lib/Mail/SpamAssassin:
BayesStore.pm BayesStore/Redis.pm Conf.pm Plugin/Bayes.pm
Author: mmartinec
Date: Fri May 31 14:17:49 2013
New Revision: 1488244
URL: http://svn.apache.org/r1488244
Log:
Bug 6942: Redis bayes storage module fixes and updates
Modified:
spamassassin/trunk/lib/Mail/SpamAssassin/BayesStore.pm
spamassassin/trunk/lib/Mail/SpamAssassin/BayesStore/Redis.pm
spamassassin/trunk/lib/Mail/SpamAssassin/Conf.pm
spamassassin/trunk/lib/Mail/SpamAssassin/Plugin/Bayes.pm
Modified: spamassassin/trunk/lib/Mail/SpamAssassin/BayesStore.pm
URL: http://svn.apache.org/viewvc/spamassassin/trunk/lib/Mail/SpamAssassin/BayesStore.pm?rev=1488244&r1=1488243&r2=1488244&view=diff
==============================================================================
--- spamassassin/trunk/lib/Mail/SpamAssassin/BayesStore.pm (original)
+++ spamassassin/trunk/lib/Mail/SpamAssassin/BayesStore.pm Fri May 31 14:17:49 2013
@@ -117,6 +117,33 @@ sub read_db_configs {
$self->{bayes}->read_db_configs();
}
+=item prefork_init
+
+public instance (Boolean) prefork_init ()
+
+Description:
+This optional method is called in the parent process shortly before
+forking off child processes.
+
+=cut
+
+# sub prefork_init {
+# my ($self) = @_;
+# }
+
+=item spamd_child_init
+
+public instance (Boolean) spamd_child_init ()
+
+Description:
+This optional method is called in a child process shortly after being spawned.
+
+=cut
+
+# sub spamd_child_init {
+# my ($self) = @_;
+# }
+
=item tie_db_readonly
public instance (Boolean) tie_db_readonly ()
Modified: spamassassin/trunk/lib/Mail/SpamAssassin/BayesStore/Redis.pm
URL: http://svn.apache.org/viewvc/spamassassin/trunk/lib/Mail/SpamAssassin/BayesStore/Redis.pm?rev=1488244&r1=1488243&r2=1488244&view=diff
==============================================================================
--- spamassassin/trunk/lib/Mail/SpamAssassin/BayesStore/Redis.pm (original)
+++ spamassassin/trunk/lib/Mail/SpamAssassin/BayesStore/Redis.pm Fri May 31 14:17:49 2013
@@ -26,7 +26,7 @@ Mail::SpamAssassin::BayesStore::Redis -
This module implementes a Redis based bayesian storage module.
!! IT IS STILL EXPERIMENTAL AND SUBJECT TO CHANGE !!
-These config variables have been hijacked for our purposes:
+The following config variable has been hijacked for our purposes:
bayes_sql_dsn
@@ -37,15 +37,22 @@ These config variables have been hijacke
To use non-default database id, use "database=x". This is not passed
to new(), but specially handled to call Redis->select($id).
- bayes_expiry_max_db_size
+ bayes_token_ttl
- Controls token/seen expiry (ttl value in SECONDS, sent as is to Redis).
- Default 150000 (41 hours) is sane (that's why we abuse this variable),
- but you should try atleast 604800 (1 week).
-
-Expiry is done internally in Redis using EXPIRY value mentioned above. This
-is why --force-expire etc does nothing and token counts and atime values are
-shown zero in statistics.
+ Controls token expiry (ttl value in SECONDS, sent as is to Redis)
+ when bayes_auto_expire is true. Default value is 3 weeks (but check
+ Mail::SpamAssassin::Conf.pm to make sure).
+
+ bayes_seen_ttl
+
+ Controls 'seen' expiry (ttl value in SECONDS, sent as is to Redis)
+ when bayes_auto_expire is true. Default value is 8 days (but check
+ Mail::SpamAssassin::Conf.pm to make sure).
+
+Expiry is done internally in Redis using _ttl values mentioned above,
+but only if bayes_auto_expire is true (which is a default). This is
+why --force-expire etc does nothing and token counts and atime values
+are shown zero in statistics.
=cut
@@ -95,12 +102,13 @@ sub new {
my $self = $class->SUPER::new(@_);
unless (HAS_REDIS) {
- dbg("bayes: unable to connect to database: DBI module not available: $!");
+ dbg("bayes: unable to connect to database: Redis module not available");
}
+ my $bconf = $self->{bayes}->{conf};
push @{$self->{redis_conf}}, 'encoding' => undef;
- foreach (split(';', $self->{bayes}->{conf}->{bayes_sql_dsn})) {
+ foreach (split(';', $bconf->{bayes_sql_dsn})) {
my ($a, $b) = split('=');
unless (defined $b) {
warn("bayes: invalid bayes_sql_dsn config\n");
@@ -115,10 +123,11 @@ sub new {
}
}
- $self->{expire_seen} =
- $self->{bayes}->{conf}->{bayes_expiry_max_db_size} || 150000;
- $self->{expire_token} =
- $self->{bayes}->{conf}->{bayes_expiry_max_db_size} || 150000;
+ $self->{expire_token} = $bconf->{bayes_token_ttl};
+ $self->{expire_seen} = $bconf->{bayes_seen_ttl};
+ if (!$bconf->{bayes_auto_expire}) {
+ $self->{expire_token} = $self->{expire_seen} = undef;
+ }
$self->{supported_db_version} = 3;
$self->{is_really_open} = 0;
@@ -132,6 +141,80 @@ sub new {
return $self;
}
+sub DESTROY {
+ my($self) = @_;
+ if ($self->{is_really_open} && $self->{redis}) {
+ eval { $self->{redis}->quit }; # close session, ignoring any failures
+ }
+}
+
+=head2 prefork_init
+
+public instance (Boolean) prefork_init ();
+
+Description:
+This optional method is called in the parent process shortly before
+forking off child processes.
+
+=cut
+
+sub prefork_init {
+ my ($self) = @_;
+
+ HAS_REDIS or return;
+
+ # Each child process must establish its own connection with a Redis server,
+ # re-using a common forked socket leads to serious trouble (garbled data).
+ #
+ # Parent process may have established its connection during startup, but
+ # it is no longer of any use by now, so we shut it down here in the master
+ # process, letting a spawned child process re-establish it later.
+
+ if ($self->{is_really_open}) {
+ dbg("bayes: prefork_init, closing a session ".
+ "with a Redis server in a parent process");
+ $self->untie_db;
+ if ($self->{redis}) {
+ eval { $self->{redis}->quit }; # close session, ignoring any failures
+ }
+ undef $self->{redis};
+ $self->{is_really_open} = 0;
+ }
+}
+
+=head2 spamd_child_init
+
+public instance (Boolean) spamd_child_init ();
+
+Description:
+This optional method is called in a child process shortly after being spawned.
+
+=cut
+
+sub spamd_child_init {
+ my ($self) = @_;
+
+ HAS_REDIS or return;
+
+ # Each child process must establish its own connection with a Redis server,
+ # re-using a common forked socket leads to serious trouble (garbled data).
+ #
+ # Just in case the parent master process did not call prefork_init() above,
+ # we try to silently renounce the use of existing cloned connection here.
+ # As the prefork_init plugin callback has only been in introduced in
+ # SpamAssassin 3.4.0, this situation can arrise in case of some third party
+ # software (or a pre-3.4.0 version of spamd) is somehow using this plugin.
+ # Better safe than sorry...
+
+ if ($self->{is_really_open}) {
+ dbg("bayes: spamd_child_init, closing a parent's session ".
+ "with a Redis server in a child process");
+ $self->untie_db;
+ undef $self->{redis}; # just drop it, don't shut down parent's session
+ $self->{is_really_open} = 0;
+ }
+}
+
=head2 tie_db_readonly
public instance (Boolean) tie_db_readonly ();
@@ -147,10 +230,15 @@ sub tie_db_readonly {
return 0 unless (HAS_REDIS);
- my $result = $self->{is_really_open} || $self->_open_db();
- $self->{is_writable} = 0 if $result;
+ my $really_open = $self->{is_really_open};
+ if ($really_open) {
+ $self->{is_officially_open} = 1;
+ } else {
+ $really_open = $self->_open_db();
+ }
+ $self->{is_writable} = 0;
- return $result;
+ return $really_open;
}
=head2 tie_db_writable
@@ -169,10 +257,16 @@ sub tie_db_writable {
return 0 unless (HAS_REDIS);
- my $result = $self->{is_really_open} || $self->_open_db();
- $self->{is_writable} = 1 if $result;
+ my $really_open = $self->{is_really_open};
+ if ($really_open) {
+ $self->{is_officially_open} = 1;
+ } else {
+ $really_open = $self->_open_db();
+ }
+
+ $self->{is_writable} = 1 if $really_open;
- return $result;
+ return $really_open;
}
=head2 _open_db
@@ -194,13 +288,15 @@ sub _open_db {
Redis->VERSION);
if ($self->{is_really_open}) {
- $self->{is_officially_open} = 1;
- return 1;
+ $self->{is_officially_open} = 1;
+ return 1;
}
$self->read_db_configs();
my $err = $self->{timer}->run_and_catch(sub {
+ $self->{opened_from_pid} = $$;
+ # will keep a persistent session open to a redis server
$self->{redis} = Redis->new(@{$self->{redis_conf}});
$self->{redis}->select($self->{db_id}) if defined $self->{db_id};
});
@@ -210,7 +306,7 @@ sub _open_db {
return 0;
}
elsif ($err) {
- $err =~ s! at /.*!!s; # skip full trace
+ $err =~ s{ at /.*}{}s; # skip full trace
warn("bayes: Redis connection failed: $err");
return 0;
}
@@ -374,12 +470,9 @@ sub get_storage_variables {
qw{LAST_JOURNAL_SYNC NSPAM NHAM NTOKENS LAST_EXPIRE
OLDEST_TOKEN_AGE DB_VERSION LAST_JOURNAL_SYNC
LAST_ATIME_DELTA LAST_EXPIRE_REDUCE NEWEST_TOKEN_AGE};
- my @values = $self->_mget(\@tokens);
- foreach (@values) {
- $_ = 0 unless $_;
- }
-
- return @values;
+ my $values = $self->_mget(\@tokens);
+ return if !$values;
+ return map(defined $_ ? $_ : 0, @$values);
}
=head2 get_running_expire_tok
@@ -440,7 +533,8 @@ sub tok_get {
my($self, $token) = @_;
my $array = $self->tok_get_all($token);
- return !@$array ? () : (@{$array->[0]})[1,2,3];
+ return if !$array || !@$array;
+ return (@{$array->[0]})[1,2,3];
}
=head2 tok_get_all
@@ -449,24 +543,33 @@ public instance (\@) tok_get (@ $tokens)
Description:
This method retrieves the specified tokens (C<$tokens>) from storage and
-returns an array ref of arrays spam count, ham acount and last access time.
+returns a ref to arrays spam count, ham count and last access time.
=cut
sub tok_get_all {
- my($self, @keys) = @_;
+ my $self = shift;
+# my @keys = @_; # avoid copying strings unnecessarily
- my @t = map {"t:$_"} @keys;
- my @results = $self->_mget(\@t);
- my @values;
+ my @t = map("t:$_", @_);
+ my $results = $self->_mget(\@t);
- foreach my $token (@keys) {
- my $value = shift(@results);
+ $results or die "bayes: tok_get_all - no results from mget";
+ if (@$results != @t) {
+ warn sprintf("bayes: tok_get_all: asked for %d tokens, got %d\n",
+ scalar @t, scalar @$results);
+ return;
+ }
+
+ my $j = 0;
+ my @values;
+ foreach my $token (@_) {
+ my $value = $results->[$j++];
push(@values, [$token, _unpack_token($value), 0]) if defined $value;
}
- dbg("bayes: tok_get_all found %d tokens out of %d search keys",
- scalar(@values), scalar(@keys));
+ dbg("bayes: tok_get_all found %d tokens out of %d",
+ scalar @values, scalar @t);
return \@values;
}
@@ -495,7 +598,7 @@ public instance (Boolean) multi_tok_coun
Description:
This method takes a C<$dspam> and C<$dham> and adds it to all of the
-tokens in the C<$tokens> hash ref along with updating each tokens
+tokens in the C<$tokens> hash ref along with updating each token's
atime with C<$atime>.
=cut
@@ -507,11 +610,19 @@ sub multi_tok_count_change {
$dspam ||= 0;
$dham ||= 0;
- my @t = map {"t:$_"} keys %{$tokens};
- my @v = $self->_mget(\@t);
+ my @t = map("t:$_", keys %{$tokens});
+ my $v = $self->_mget(\@t);
+ $v or die "bayes: multi_tok_count_change - no results from mget";
+ if (@$v != @t) {
+ warn sprintf("bayes: multi_tok_count_change: asked for %d tokens, got %d\n",
+ scalar @t, scalar @$v);
+ return;
+ }
+
+ my $j = 0;
foreach my $token (@t) {
- my $value = shift(@v);
+ my $value = $v->[$j++];
my ($spam, $ham) = defined $value ? _unpack_token($value) : (0,0);
$spam += $dspam;
$ham += $dham;
@@ -570,7 +681,7 @@ sub nspam_nham_change {
die("bayes: Redis connection timed out!");
}
elsif ($err) {
- $err =~ s! at /.*!!s; # skip full trace
+ $err =~ s{ at /.*}{}s; # skip full trace
die("bayes: failed to increment nspam $ds nham $dh: $err");
}
@@ -612,8 +723,8 @@ sub tok_touch_all {
my($self, $tokens, $newatime) = @_;
# We just refresh TTL on all
- foreach (map {"t:$_"} @$tokens) {
- $self->_expire_p($_, $self->{expire_token});
+ foreach (@$tokens) {
+ $self->_expire_p("t:$_", $self->{expire_token});
}
$self->_wait_all_responses;
@@ -724,13 +835,13 @@ sub backup_database {
for (my $i = 0; $i < @$reply; $i += 10000) {
my $end = $i + 10000 > @$reply ? @$reply - 1 : $i + 9999;
my @t = @$reply[$i .. $end];
- my @v = $self->_mget(\@t);
- die "bayes: token fetch failed" unless @v;
- for (my $i = 0; $i < @v; $i++) {
- next unless defined $v[$i];
- my($ts, $th) = _unpack_token($v[$i]);
+ my $v = $self->_mget(\@t);
+ die "bayes: token fetch failed" unless $v && @$v;
+ for (my $i = 0; $i < @$v; $i++) {
+ next unless defined $v->[$i];
+ my($ts, $th) = _unpack_token($v->[$i]);
my $encoded = unpack("H*", substr($t[$i], 2));
- print "t\t$ts\t$th\t$atime\t$encoded\n";
+ printf("t\t%d\t%d\t%s\t%s\n", $ts, $th, $atime, $encoded);
}
}
});
@@ -742,11 +853,11 @@ sub backup_database {
for (my $i = 0; $i < @$reply; $i += 10000) {
my $end = $i + 10000 > @$reply ? @$reply - 1 : $i + 9999;
my @t = @$reply[$i .. $end];
- my @v = $self->_mget(\@t);
- die "bayes: seen fetch failed" unless @v;
- for (my $i = 0; $i < @v; $i++) {
- next unless defined $v[$i];
- print "s\t$v[$i]\t".substr($t[$i], 2)."\n";
+ my $v = $self->_mget(\@t);
+ die "bayes: seen fetch failed" unless $v && @$v;
+ for (my $i = 0; $i < @$v; $i++) {
+ next unless defined $v->[$i];
+ printf("s\t%s\t%s\n", $v->[$i], substr($t[$i], 2));
}
}
});
@@ -940,7 +1051,7 @@ sub db_writable {
my($self) = @_;
return $self->{is_really_open} && $self->{is_officially_open} &&
- $self->{is_writable};
+ $self->{is_writable};
}
# token marshalling format for tokens
@@ -991,7 +1102,7 @@ sub _get {
die("bayes: get timed out!");
}
elsif ($err) {
- $err =~ s! at /.*!!s; # skip full trace
+ $err =~ s{ at /.*}{}s; # skip full trace
die("bayes: get failed: $err");
}
@@ -1011,11 +1122,11 @@ sub _mget {
die("bayes: mget timed out!");
}
elsif ($err) {
- $err =~ s! at /.*!!s; # skip full trace
+ $err =~ s{ at /.*}{}s; # skip full trace
die("bayes: mget failed: $err");
}
- return @values;
+ return \@values;
}
sub _set {
@@ -1033,7 +1144,7 @@ sub _set {
die("bayes: set timed out!");
}
elsif ($err) {
- $err =~ s! at /.*!!s; # skip full trace
+ $err =~ s{ at /.*}{}s; # skip full trace
die("bayes: set failed: $err");
}
@@ -1079,11 +1190,13 @@ sub _wait_all_responses {
});
if ($self->{timer}->timed_out()) {
- die("bayes: wait_all_responses timed out!");
+ die sprintf("bayes: wait_all_responses timed out! called from line %s\n",
+ (caller)[2]);
}
elsif ($err) {
- $err =~ s! at /.*!!s; # skip full trace
- die("bayes: wait_all_responses failed: $err");
+ $err =~ s{ at /.*}{}s; # skip full trace
+ die sprintf("bayes: wait_all_responses failed: %s, called from line %s\n",
+ $err, (caller)[2]);
}
return 1;
@@ -1100,7 +1213,7 @@ sub _mset {
die("bayes: mset timed out!");
}
elsif ($err) {
- $err =~ s! at /.*!!s; # skip full trace
+ $err =~ s{ at /.*}{}s; # skip full trace
die("bayes: mset failed: $err");
}
@@ -1118,7 +1231,7 @@ sub _del {
die("bayes: del timed out!");
}
elsif ($err) {
- $err =~ s! at /.*!!s; # skip full trace
+ $err =~ s{ at /.*}{}s; # skip full trace
die("bayes: mset failed: $err");
}
Modified: spamassassin/trunk/lib/Mail/SpamAssassin/Conf.pm
URL: http://svn.apache.org/viewvc/spamassassin/trunk/lib/Mail/SpamAssassin/Conf.pm?rev=1488244&r1=1488243&r2=1488244&view=diff
==============================================================================
--- spamassassin/trunk/lib/Mail/SpamAssassin/Conf.pm (original)
+++ spamassassin/trunk/lib/Mail/SpamAssassin/Conf.pm Fri May 31 14:17:49 2013
@@ -2089,7 +2089,9 @@ equivalent to a 8Mb database file.
If enabled, the Bayes system will try to automatically expire old tokens
from the database. Auto-expiry occurs when the number of tokens in the
-database surpasses the bayes_expiry_max_db_size value.
+database surpasses the bayes_expiry_max_db_size value. If a bayes datastore
+backend does not implement individual key/value expirations, the setting
+is silently ignored.
=cut
@@ -2099,6 +2101,39 @@ database surpasses the bayes_expiry_max_
type => $CONF_TYPE_BOOL,
});
+=item bayes_token_ttl (default: 1814400, i.e. 3 weeks)
+
+If bayes_auto_expire is true and a Bayes datastore backend supports it
+(currently only Redis), this is a time-to-live / expiration time in seconds
+(since the last time they were touched) for tokens kept in a Bayes database.
+The value is observed on a best-effort basis, exact timing promises are not
+necessarily kept. If a bayes datastore backend does not implement individual
+key/value expirations, the setting is silently ignored.
+
+=cut
+
+ push (@cmds, {
+ setting => 'bayes_token_ttl',
+ default => 3*7*24*60*60, # seconds
+ type => $CONF_TYPE_NUMERIC,
+ });
+
+=item bayes_seen_ttl (default: 691200, i.e. 8 days)
+
+If bayes_auto_expire is true and a Bayes datastore backend supports it
+(currently only Redis), this is a time-to-live / expiration time in seconds
+for 'seen' entries (i.e. mail message digests with their status) kept in a
+Bayes database. The value is observed on a best-effort basis, exact timing
+promises are not necessarily kept.
+
+=cut
+
+ push (@cmds, {
+ setting => 'bayes_token_ttl',
+ default => 8*24*60*60, # seconds
+ type => $CONF_TYPE_NUMERIC,
+ });
+
=item bayes_learn_to_journal (default: 0)
If this option is set, whenever SpamAssassin does Bayes learning, it
Modified: spamassassin/trunk/lib/Mail/SpamAssassin/Plugin/Bayes.pm
URL: http://svn.apache.org/viewvc/spamassassin/trunk/lib/Mail/SpamAssassin/Plugin/Bayes.pm?rev=1488244&r1=1488243&r2=1488244&view=diff
==============================================================================
--- spamassassin/trunk/lib/Mail/SpamAssassin/Plugin/Bayes.pm (original)
+++ spamassassin/trunk/lib/Mail/SpamAssassin/Plugin/Bayes.pm Fri May 31 14:17:49 2013
@@ -249,6 +249,8 @@ sub finish {
%{$self} = ();
}
+###########################################################################
+
# Plugin hook.
# Return this implementation object, for callers that need to know
# it. TODO: callers shouldn't *need* to know it!
@@ -258,6 +260,31 @@ sub learner_get_implementation { return
###########################################################################
+# Plugin hook.
+# Called in the parent process shortly before forking off child processes.
+sub prefork_init {
+ my ($self) = @_;
+
+ if ($self->{store} && $self->{store}->UNIVERSAL::can('prefork_init')) {
+ $self->{store}->prefork_init;
+ }
+}
+
+###########################################################################
+
+# Plugin hook.
+# Called in a child process shortly after being spawned.
+sub spamd_child_init {
+ my ($self) = @_;
+
+ if ($self->{store} && $self->{store}->UNIVERSAL::can('spamd_child_init')) {
+ $self->{store}->spamd_child_init;
+ }
+}
+
+###########################################################################
+
+# Plugin hook.
sub check_bayes {
my ($self, $pms, $fulltext, $min, $max) = @_;
@@ -349,6 +376,7 @@ sub learn_message {
eval {
local $SIG{'__DIE__'}; # do not run user die() traps in here
+ my $timer = $self->{main}->time_method("b_learn");
my $ok;
if ($self->{main}->{learn_to_journal}) {
@@ -484,6 +512,7 @@ sub forget_message {
# synchronously
eval {
local $SIG{'__DIE__'}; # do not run user die() traps in here
+ my $timer = $self->{main}->time_method("b_learn");
my $ok;
if ($self->{main}->{learn_to_journal}) {