You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spamassassin.apache.org by mm...@apache.org on 2013/06/18 15:55:21 UTC
svn commit: r1494137 - in /spamassassin/trunk/lib/Mail/SpamAssassin:
BayesStore/Redis.pm Util/DependencyInfo.pm
Author: mmartinec
Date: Tue Jun 18 13:55:21 2013
New Revision: 1494137
URL: http://svn.apache.org/r1494137
Log:
Bug 6942: Redis bayes storage - replace packed s/h counts with redis hashes
Modified:
spamassassin/trunk/lib/Mail/SpamAssassin/BayesStore/Redis.pm
spamassassin/trunk/lib/Mail/SpamAssassin/Util/DependencyInfo.pm
Modified: spamassassin/trunk/lib/Mail/SpamAssassin/BayesStore/Redis.pm
URL: http://svn.apache.org/viewvc/spamassassin/trunk/lib/Mail/SpamAssassin/BayesStore/Redis.pm?rev=1494137&r1=1494136&r2=1494137&view=diff
==============================================================================
--- spamassassin/trunk/lib/Mail/SpamAssassin/BayesStore/Redis.pm (original)
+++ spamassassin/trunk/lib/Mail/SpamAssassin/BayesStore/Redis.pm Tue Jun 18 13:55:21 2013
@@ -81,7 +81,8 @@ BEGIN {
@ISA = qw( Mail::SpamAssassin::BayesStore );
}
-use constant HAS_REDIS => eval { require Redis; };
+# Support for "SCRIPT LOAD" command is needed, provided by Redis version 1.954
+use constant HAS_REDIS => eval { require Redis; Redis->VERSION(1.954) };
=head1 METHODS
@@ -123,10 +124,17 @@ sub new {
}
}
- $self->{expire_token} = $bconf->{bayes_token_ttl};
- $self->{expire_seen} = $bconf->{bayes_seen_ttl};
if (!$bconf->{bayes_auto_expire}) {
$self->{expire_token} = $self->{expire_seen} = undef;
+ warn("bayes: the setting bayes_auto_expire is off, this is ".
+ "not a recommended setting for the Redis bayes backend");
+ } else {
+ $self->{expire_token} = $bconf->{bayes_token_ttl};
+ undef $self->{expire_token} if $self->{expire_token} &&
+ $self->{expire_token} < 0;
+ $self->{expire_seen} = $bconf->{bayes_seen_ttl};
+ undef $self->{expire_seen} if $self->{expire_seen} &&
+ $self->{expire_seen} < 0;
}
$self->{supported_db_version} = 3;
@@ -311,27 +319,54 @@ sub _open_db {
return 0;
}
+ my $have_lua = $self->{have_lua};
+ if (!$self->{redis_server_version}) {
+ my $info = $self->{info} = $self->{redis}->info;
+ if ($info) {
+ $self->{redis_server_version} = $info->{redis_version};
+ $have_lua = $self->{have_lua} = 1 if exists $info->{used_memory_lua};
+
+ dbg("bayes: redis server version %s, memory used %.1f MiB%s",
+ $info->{redis_version}, $info->{used_memory}/1024/1024,
+ !$have_lua ? '' : ", Lua is available");
+ }
+ if (!$have_lua) {
+ warn "bayes: Redis server does not support Lua, ".
+ "upgrade or expect slower operation\n";
+ }
+ }
+
$self->{db_version} = $self->_get('v:DB_VERSION');
if (!$self->{db_version}) {
- $self->{db_version} = $self->DB_VERSION;
- my $ret = $self->_mset([
- 'v:DB_VERSION', $self->{db_version},
- 'v:NSPAM', 0,
- 'v:NHAM', 0,
- ]);
- unless ($ret) {
- warn("bayes: failed to initialize database");
- return 0;
- }
- dbg("bayes: initialized empty database, version $self->{db_version}");
+ $self->{db_version} = $self->DB_VERSION;
+ my $ret = $self->{redis}->mset(['v:DB_VERSION', $self->{db_version},
+ 'v:NSPAM', 0,
+ 'v:NHAM', 0,
+ 'v:TOKEN_FORMAT', 2,
+ ]);
+ unless ($ret) {
+ warn("bayes: failed to initialize database");
+ return 0;
+ }
+ dbg("bayes: initialized empty database, version $self->{db_version}");
}
else {
- dbg("bayes: found bayes db version $self->{db_version}");
+ dbg("bayes: found bayes db version %s", $self->{db_version});
if ($self->{db_version} ne $self->DB_VERSION) {
warn("bayes: bayes db version $self->{db_version} not supported, aborting\n");
return 0;
}
+ my $token_format = $self->_get('v:TOKEN_FORMAT') || 0;
+ if ($token_format < 2) {
+ warn("bayes: bayes old token format $token_format not supported, ".
+ "consider backup/restore or initialize a database\n");
+ return 0;
+ }
+ }
+
+ if ($have_lua && !defined $self->{multi_hmget_script}) {
+ $self->_define_lua_scripts;
}
$self->{is_really_open} = 1;
@@ -469,7 +504,8 @@ sub get_storage_variables {
my @tokens = map {"v:$_"}
qw{LAST_JOURNAL_SYNC NSPAM NHAM NTOKENS LAST_EXPIRE
OLDEST_TOKEN_AGE DB_VERSION LAST_JOURNAL_SYNC
- LAST_ATIME_DELTA LAST_EXPIRE_REDUCE NEWEST_TOKEN_AGE};
+ LAST_ATIME_DELTA LAST_EXPIRE_REDUCE NEWEST_TOKEN_AGE
+ TOKEN_FORMAT};
my $values = $self->_mget(\@tokens);
return if !$values;
return map(defined $_ ? $_ : 0, @$values);
@@ -518,7 +554,7 @@ and expire is currently running.
sub remove_running_expire_tok {
return 1;
}
-
+
=head2 tok_get
public instance (Integer, Integer, Integer) tok_get (String $token)
@@ -551,25 +587,41 @@ sub tok_get_all {
my $self = shift;
# my @keys = @_; # avoid copying strings unnecessarily
- my @t = map("t:$_", @_);
- my $results = $self->_mget(\@t);
+ my @values;
+ my $r = $self->{redis};
- $results or die "bayes: tok_get_all - no results from mget";
- if (@$results != @t) {
- warn sprintf("bayes: tok_get_all: asked for %d tokens, got %d\n",
- scalar @t, scalar @$results);
- return;
- }
+ if (! $self->{have_lua} ) {
+ foreach my $token (@_) {
+ $r->hmget('w:'.$token, 's', 'h', sub {
+ my($values, $error) = @_;
+ return if !$values || @$values != 2;
+ push(@values, [$token, $values->[0]||0, $values->[1]||0, 0]);
+ 1;
+ });
+ }
+ $self->_wait_all_responses;
- my $j = 0;
- my @values;
- foreach my $token (@_) {
- my $value = $results->[$j++];
- push(@values, [$token, _unpack_token($value), 0]) if defined $value;
+ } else { # have Lua, faster
+ my @results;
+ eval {
+ @results = $r->evalsha($self->{multi_hmget_script}, scalar @_, @_);
+ 1;
+ } or do { # Lua script probably not cached, define again and re-try
+ $@ =~ /^\Q[evalsha] NOSCRIPT\E/ or die "bayes: Redis LUA error: $@\n";
+ $self->_define_lua_scripts;
+ @results = $r->evalsha($self->{multi_hmget_script}, scalar @_, @_);
+ };
+ @results == @_
+ or die sprintf("bayes: tok_get_all got %d results, expected %d\n",
+ scalar @results, scalar @_);
+ foreach my $token (@_) {
+ my($s,$h) = split(m{/}, shift @results, 2);
+ push(@values, [$token, 0+$s, 0+$h, 0]) if $s || $h;
+ }
}
dbg("bayes: tok_get_all found %d tokens out of %d",
- scalar @values, scalar @t);
+ scalar @values, scalar @_);
return \@values;
}
@@ -606,36 +658,67 @@ atime with C<$atime>.
sub multi_tok_count_change {
my($self, $dspam, $dham, $tokens, $newatime) = @_;
- # Make sure we have some values
+ # turn undef or an empty string into a 0
$dspam ||= 0;
- $dham ||= 0;
+ $dham ||= 0;
+ # the increment must be an integer, otherwise redis returns an error
- my @t = map("t:$_", keys %{$tokens});
- my $v = $self->_mget(\@t);
+ my $ttl = $self->{expire_token}; # time-to-live, in seconds
- $v or die "bayes: multi_tok_count_change - no results from mget";
- if (@$v != @t) {
- warn sprintf("bayes: multi_tok_count_change: asked for %d tokens, got %d\n",
- scalar @t, scalar @$v);
- return;
- }
-
- my $j = 0;
- foreach my $token (@t) {
- my $value = $v->[$j++];
- my ($spam, $ham) = defined $value ? _unpack_token($value) : (0,0);
- $spam += $dspam;
- $ham += $dham;
- $spam = 0 if $spam < 0;
- $ham = 0 if $ham < 0;
- if ($ham == 0 && $spam == 0) {
- $self->_del_p($token);
- } else {
- $self->_set_p($token, _pack_token($spam, $ham), $self->{expire_token});
+ dbg("bayes: multi_tok_count_change learning %d spam, %d ham",
+ $dspam, $dham);
+
+ if ($self->{have_lua}) {
+
+ my $r = $self->{redis};
+ my $ntokens = scalar keys %$tokens;
+ my $cnt;
+ eval {
+ $cnt = $r->evalsha($self->{multi_hincrby},
+ $ntokens, keys %$tokens, $dspam, $dham, $ttl);
+ 1;
+ } or do { # Lua script probably not cached, define again and re-try
+ $@ =~ /^\Q[evalsha] NOSCRIPT\E/ or die "bayes: Redis LUA error: $@\n";
+ $self->_define_lua_scripts;
+ $cnt = $r->evalsha($self->{multi_hincrby},
+ $ntokens, keys %$tokens, $dspam, $dham, $ttl);
+ };
+ $cnt == $ntokens
+ or die sprintf("bayes: multi_tok_count_change got %d, expected %d\n",
+ $cnt, $ntokens);
+
+ } else { # no Lua, slower
+
+ if ($dspam > 0 || $dham > 0) { # learning
+ while (my($token,$v) = each(%$tokens)) {
+ $self->_hincrby_p('w:'.$token, 's', $dspam) if $dspam > 0;
+ $self->_hincrby_p('w:'.$token, 'h', $dham) if $dham > 0;
+ $self->_expire_p('w:'.$token, $ttl) if $ttl;
+ }
+ $self->_wait_all_responses;
}
- }
- $self->_wait_all_responses;
+ if ($dspam < 0 || $dham < 0) { # unlearning - rare, not as efficient
+ while (my($token,$v) = each(%$tokens)) {
+ if ($dspam < 0) {
+ my $result = $self->_hincrby('w:'.$token, 's', int $dspam);
+ if (!$result || $result <= 0) {
+ $self->hdel('w:'.$token, 's');
+ } elsif ($ttl) {
+ $self->expire('w:'.$token, $ttl);
+ }
+ }
+ if ($dham < 0) {
+ my $result = $self->_hincrby('w:'.$token, 'h', int $dham);
+ if (!$result || $result <= 0) {
+ $self->hdel('w:'.$token, 'h');
+ } elsif ($ttl) {
+ $self->expire('w:'.$token, $ttl);
+ }
+ }
+ }
+ }
+ }
return 1;
}
@@ -722,15 +805,34 @@ if the existing token atime is < C<$atim
sub tok_touch_all {
my($self, $tokens, $newatime) = @_;
- return 1 unless defined $self->{expire_token};
+ my $ttl = $self->{expire_token}; # time-to-live, in seconds
+ return 1 unless defined $ttl;
+
+ dbg("bayes: tok_touch_all setting expire to %s on %d tokens",
+ $ttl, scalar @$tokens);
# We just refresh TTL on all
- foreach (@$tokens) {
- $self->_expire_p("t:$_", $self->{expire_token});
+ if (! $self->{have_lua} ) {
+ $self->_expire_p("w:$_", $ttl) for @$tokens;
+ $self->_wait_all_responses;
+
+ } else { # have Lua, faster
+ my $r = $self->{redis};
+ my $cnt;
+ eval {
+ $cnt = $r->evalsha($self->{multi_expire_script},
+ scalar @$tokens, @$tokens, $ttl);
+ 1;
+ } or do { # Lua script probably not cached, define again and re-try
+ $@ =~ /^\Q[evalsha] NOSCRIPT\E/ or die "bayes: Redis LUA error: $@\n";
+ $self->_define_lua_scripts;
+ $cnt = $r->evalsha($self->{multi_expire_script},
+ scalar @$tokens, @$tokens, $ttl);
+ };
+ $cnt == @$tokens
+ or die sprintf("bayes: tok_touch_all got %d, expected %d\n",
+ $cnt, scalar @$tokens);
}
-
- $self->_wait_all_responses;
-
return 1;
}
@@ -827,43 +929,60 @@ sub backup_database {
print "v\t$vars[1]\tnum_spam\n";
print "v\t$vars[2]\tnum_nonspam\n";
- # Process tokens in chunks of 10000 to save some memory on large sets
- # (sadly it's impossible to prevent Redis-module itself keeping all
- # resulting keys in memory)
-
- $self->{redis}->keys('t:*', sub {
- my ($reply, $error) = @_;
- die "bayes: token keys fetch failed: $error" if defined $error;
- for (my $i = 0; $i < @$reply; $i += 10000) {
- my $end = $i + 10000 > @$reply ? @$reply - 1 : $i + 9999;
- my @t = @$reply[$i .. $end];
- my $v = $self->_mget(\@t);
- die "bayes: token fetch failed" unless $v && @$v;
- for (my $i = 0; $i < @$v; $i++) {
- next unless defined $v->[$i];
- my($ts, $th) = _unpack_token($v->[$i]);
- my $encoded = unpack("H*", substr($t[$i], 2));
- printf("t\t%d\t%d\t%s\t%s\n", $ts, $th, $atime, $encoded);
+ my $r = $self->{redis};
+
+ # Sadly it's impossible to prevent Redis-module itself keeping all
+ # resulting keys in memory.
+ my @keys;
+
+ # let's get past this terrible command as fast as possible
+ @keys = $r->keys('w:*');
+ dbg("bayes: fetched %d token keys", scalar @keys);
+
+ # process tokens in chunks of 1000
+ for (my $i = 0; $i <= $#keys; $i += 1000) {
+ my $end = $i + 999 >= $#keys ? $#keys : $i + 999;
+
+ if ($self->{have_lua}) {
+ my @tokens = map(substr($_,2), @keys[$i .. $end]); # strip leading "w:"
+ my @results = $r->evalsha($self->{multi_hmget_script},
+ scalar @tokens, @tokens);
+ foreach my $token (@tokens) {
+ my($s,$h) = split(m{/}, shift @results, 2);
+ my $encoded = unpack("H*", $token);
+ printf("t\t%d\t%d\t%s\t%s\n", $s, $h, $atime, $encoded) if $s || $h;
}
- }
- });
- $self->{redis}->wait_all_responses;
- $self->{redis}->keys('s:*', sub {
- my ($reply, $error) = @_;
- die "bayes: seen keys fetch failed: $error" if defined $error;
- for (my $i = 0; $i < @$reply; $i += 10000) {
- my $end = $i + 10000 > @$reply ? @$reply - 1 : $i + 9999;
- my @t = @$reply[$i .. $end];
- my $v = $self->_mget(\@t);
- die "bayes: seen fetch failed" unless $v && @$v;
- for (my $i = 0; $i < @$v; $i++) {
- next unless defined $v->[$i];
- printf("s\t%s\t%s\n", $v->[$i], substr($t[$i], 2));
+ } else { # no Lua, slower
+ for (my $j = $i; $j <= $end; $j++) {
+ my $token = $keys[$j];
+ $r->hmget($token, 's', 'h', sub {
+ my($values, $error) = @_;
+ return if !$values || @$values != 2;
+ return if !$values->[0] && !$values->[1];
+ my $encoded = unpack("H*", substr($token, 2));
+ printf("t\t%d\t%d\t%s\t%s\n",
+ $values->[0]||0, $values->[1]||0, $atime, $encoded);
+ 1;
+ });
}
+ $self->_wait_all_responses;
}
- });
- $self->{redis}->wait_all_responses;
+ }
+
+ @keys = $r->keys('s:*');
+ dbg("bayes: fetched %d seen keys", scalar @keys);
+
+ for (my $i = 0; $i <= $#keys; $i += 1000) {
+ my $end = $i + 999 >= $#keys ? $#keys : $i + 999;
+ my @t = @keys[$i .. $end];
+ my $v = $self->_mget(\@t);
+ die "bayes: seen fetch failed" unless $v && @$v;
+ for (my $i = 0; $i < @$v; $i++) {
+ next unless defined $v->[$i];
+ printf("s\t%s\t%s\n", $v->[$i], substr($t[$i], 2));
+ }
+ }
$self->untie_db();
@@ -959,9 +1078,12 @@ sub restore_database {
# turn unpacked binary token back into binary value
$token = pack("H*",$token);
}
+ my $key = 'w:'.$token;
+ $self->_hincrby_p($key, 's', int $spam_count) if $spam_count > 0;
+ $self->_hincrby_p($key, 'h', int $ham_count) if $ham_count > 0;
+ $self->_expire_p($key, $self->{expire_token})
+ if defined $self->{expire_token};
- $self->_set_p("t:$token", _pack_token($spam_count, $ham_count),
- $self->{expire_token});
$self->{redis}->wait_all_responses if ++$q_cnt % 10000 == 0;
$token_count++;
} elsif ($line =~ /^s\s+/) { # seen line
@@ -1056,41 +1178,64 @@ sub db_writable {
$self->{is_writable};
}
-# token marshalling format for tokens
-# pack CC for values <256, VV for the rest, keep it simple
-
-sub _unpack_token {
- my $value = shift;
-
- my ($ts, $th);
-
- if (length($value) == 2) {
- ($ts, $th) = unpack("CC", $value);
- }
- elsif (length($value) == 8) {
- ($ts, $th) = unpack("VV", $value);
- }
- else {
- dbg("bayes: unknown token format: ".unpack("H*", $value));
- }
+#
+# Redis functions
+#
- return ($ts||0, $th||0);
-}
+sub _define_lua_scripts {
+ my $self = shift;
+ dbg("bayes: defining Lua scripts");
+ my $r = $self->{redis};
-sub _pack_token {
- my($ts, $th) = @_;
+ $self->{multi_hmget_script} = $r->script_load(<<'END');
+ local r = {}
+ for j = 1, #KEYS do
+ local sh = redis.call("hmget", "w:" .. KEYS[j], "s", "h")
+ -- returns counts as "spam/ham" pairs
+ table.insert(r, (sh[1] or 0) .. "/" .. (sh[2] or 0))
+ end
+ return r
+END
+
+ $self->{multi_expire_script} = $r->script_load(<<'END');
+ local ttl = ARGV[1]
+ for j = 1, #KEYS do
+ redis.call("expire", "w:" .. KEYS[j], ttl)
+ end
+ return #KEYS
+END
+
+ $self->{multi_hincrby} = $r->script_load(<<'END');
+ local s, h, ttl = ARGV[1], ARGV[2], ARGV[3]
+ local set_expire = ttl and tonumber(ttl) > 0
+ if s ~= 0 then
+ for j = 1, #KEYS do
+ local token = "w:" .. KEYS[j]
+ local cnt = redis.call("hincrby", token, "s", s)
+ if cnt <= 0 then
+ redis.call("hdel", token, "s")
+ elseif set_expire then
+ redis.call("expire", token, ttl)
+ end
+ end
+ end
+ if h ~= 0 then
+ for j = 1, #KEYS do
+ local token = "w:" .. KEYS[j]
+ local cnt = redis.call("hincrby", token, "h", h)
+ if cnt <= 0 then
+ redis.call("hdel", token, "h")
+ elseif set_expire then
+ redis.call("expire", token, ttl)
+ end
+ end
+ end
+ return #KEYS
+END
- if ($ts < 256 && $th < 256) {
- return pack("CC", $ts, $th);
- } else {
- return pack("VV", $ts, $th);
- }
+ 1;
}
-#
-# Redis functions
-#
-
sub _get {
my ($self, $key) = @_;
@@ -1131,6 +1276,25 @@ sub _mget {
return \@values;
}
+sub _hmget {
+ my ($self, $key, @fields) = @_;
+
+ my $value;
+ my $err = $self->{timer}->run_and_catch(sub {
+ $value = $self->{redis}->hmget($key, @fields);
+ });
+
+ if ($self->{timer}->timed_out()) {
+ die("bayes: hmget timed out!");
+ }
+ elsif ($err) {
+ $err =~ s{ at /.*}{}s; # skip full trace
+ die("bayes: hmget failed: $err");
+ }
+
+ return $value;
+}
+
sub _set {
my ($self, $key, $value, $expire) = @_;
@@ -1153,6 +1317,24 @@ sub _set {
return 1;
}
+sub _hincrby {
+ my ($self, $key, $field, $incr) = @_;
+
+ my $err = $self->{timer}->run_and_catch(sub {
+ $self->{redis}->hincrby($key, $field, $incr);
+ });
+
+ if ($self->{timer}->timed_out()) {
+ die("bayes: hincrby timed out!");
+ }
+ elsif ($err) {
+ $err =~ s{ at /.*}{}s; # skip full trace
+ die("bayes: hincrby failed: $err");
+ }
+
+ return 1;
+}
+
# Pipelined set, must call _wait_all_responses after
sub _set_p {
my ($self, $key, $value, $expire) = @_;
@@ -1166,6 +1348,15 @@ sub _set_p {
return 1;
}
+# Pipelined hincrby, must call _wait_all_responses after
+sub _hincrby_p {
+ my ($self, $key, $field, $incr) = @_;
+
+ $self->{redis}->hincrby($key, $field, $incr, sub {});
+
+ return 1;
+}
+
# Pipelined del, must call _wait_all_responses after
sub _del_p {
my ($self, $key) = @_;
@@ -1206,24 +1397,6 @@ sub _wait_all_responses {
return 1;
}
-sub _mset {
- my ($self, $values) = @_;
-
- my $err = $self->{timer}->run_and_catch(sub {
- $self->{redis}->mset(@$values);
- });
-
- if ($self->{timer}->timed_out()) {
- die("bayes: mset timed out!");
- }
- elsif ($err) {
- $err =~ s{ at /.*}{}s; # skip full trace
- die("bayes: mset failed: $err");
- }
-
- return 1;
-}
-
sub _del {
my ($self, $key) = @_;
Modified: spamassassin/trunk/lib/Mail/SpamAssassin/Util/DependencyInfo.pm
URL: http://svn.apache.org/viewvc/spamassassin/trunk/lib/Mail/SpamAssassin/Util/DependencyInfo.pm?rev=1494137&r1=1494136&r2=1494137&view=diff
==============================================================================
--- spamassassin/trunk/lib/Mail/SpamAssassin/Util/DependencyInfo.pm (original)
+++ spamassassin/trunk/lib/Mail/SpamAssassin/Util/DependencyInfo.pm Tue Jun 18 13:55:21 2013
@@ -217,6 +217,12 @@ $have_sha ? {
your database.',
},
{
+ module => 'Redis',
+ version => 1.954,
+ desc => 'If you intend to use SpamAssassin with a Redis database backend for
+ Bayes storage, you will need to have this module installed.',
+},
+{
module => 'Getopt::Long',
version => '2.32', # min version was included in 5.8.0, which works
desc => 'The "sa-stats.pl" program included in "tools", used to generate