You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spamassassin.apache.org by mm...@apache.org on 2013/06/18 15:55:21 UTC

svn commit: r1494137 - in /spamassassin/trunk/lib/Mail/SpamAssassin: BayesStore/Redis.pm Util/DependencyInfo.pm

Author: mmartinec
Date: Tue Jun 18 13:55:21 2013
New Revision: 1494137

URL: http://svn.apache.org/r1494137
Log:
Bug 6942: Redis bayes storage - replace packed s/h counts with redis hashes

Modified:
    spamassassin/trunk/lib/Mail/SpamAssassin/BayesStore/Redis.pm
    spamassassin/trunk/lib/Mail/SpamAssassin/Util/DependencyInfo.pm

Modified: spamassassin/trunk/lib/Mail/SpamAssassin/BayesStore/Redis.pm
URL: http://svn.apache.org/viewvc/spamassassin/trunk/lib/Mail/SpamAssassin/BayesStore/Redis.pm?rev=1494137&r1=1494136&r2=1494137&view=diff
==============================================================================
--- spamassassin/trunk/lib/Mail/SpamAssassin/BayesStore/Redis.pm (original)
+++ spamassassin/trunk/lib/Mail/SpamAssassin/BayesStore/Redis.pm Tue Jun 18 13:55:21 2013
@@ -81,7 +81,8 @@ BEGIN {
   @ISA = qw( Mail::SpamAssassin::BayesStore );
 }
 
-use constant HAS_REDIS => eval { require Redis; };
+# Support for "SCRIPT LOAD" command is needed, provided by Redis version 1.954
+use constant HAS_REDIS => eval { require Redis; Redis->VERSION(1.954) };
 
 =head1 METHODS
 
@@ -123,10 +124,17 @@ sub new {
     }
   }
 
-  $self->{expire_token} = $bconf->{bayes_token_ttl};
-  $self->{expire_seen}  = $bconf->{bayes_seen_ttl};
   if (!$bconf->{bayes_auto_expire}) {
     $self->{expire_token} = $self->{expire_seen} = undef;
+    warn("bayes: the setting bayes_auto_expire is off, this is ".
+         "not a recommended setting for the Redis bayes backend");
+  } else {
+    $self->{expire_token} = $bconf->{bayes_token_ttl};
+    undef $self->{expire_token}  if $self->{expire_token} &&
+                                    $self->{expire_token} < 0;
+    $self->{expire_seen}  = $bconf->{bayes_seen_ttl};
+    undef $self->{expire_seen}   if $self->{expire_seen} &&
+                                    $self->{expire_seen} < 0;
   }
 
   $self->{supported_db_version} = 3;
@@ -311,27 +319,54 @@ sub _open_db {
     return 0;
   }
 
+  my $have_lua = $self->{have_lua};
+  if (!$self->{redis_server_version}) {
+    my $info = $self->{info} = $self->{redis}->info;
+    if ($info) {
+      $self->{redis_server_version} = $info->{redis_version};
+      $have_lua = $self->{have_lua} = 1  if exists $info->{used_memory_lua};
+
+      dbg("bayes: redis server version %s, memory used %.1f MiB%s",
+          $info->{redis_version}, $info->{used_memory}/1024/1024,
+          !$have_lua ? '' : ", Lua is available");
+    }
+    if (!$have_lua) {
+      warn "bayes: Redis server does not support Lua, ".
+           "upgrade or expect slower operation\n";
+    }
+  }
+
   $self->{db_version} = $self->_get('v:DB_VERSION');
 
   if (!$self->{db_version}) {
-      $self->{db_version} = $self->DB_VERSION;
-      my $ret = $self->_mset([
-        'v:DB_VERSION', $self->{db_version},
-        'v:NSPAM', 0,
-        'v:NHAM', 0,
-      ]);
-      unless ($ret) {
-          warn("bayes: failed to initialize database");
-          return 0;
-      }
-      dbg("bayes: initialized empty database, version $self->{db_version}");
+    $self->{db_version} = $self->DB_VERSION;
+    my $ret = $self->{redis}->mset(['v:DB_VERSION', $self->{db_version},
+                                    'v:NSPAM', 0,
+                                    'v:NHAM', 0,
+                                    'v:TOKEN_FORMAT', 2,
+                                   ]);
+    unless ($ret) {
+      warn("bayes: failed to initialize database");
+      return 0;
+    }
+    dbg("bayes: initialized empty database, version $self->{db_version}");
   }
   else {
-    dbg("bayes: found bayes db version $self->{db_version}");
+    dbg("bayes: found bayes db version %s", $self->{db_version});
     if ($self->{db_version} ne $self->DB_VERSION) {
       warn("bayes: bayes db version $self->{db_version} not supported, aborting\n");
       return 0;
     }
+    my $token_format = $self->_get('v:TOKEN_FORMAT') || 0;
+    if ($token_format < 2) {
+      warn("bayes: bayes old token format $token_format not supported, ".
+           "consider backup/restore or initialize a database\n");
+      return 0;
+    }
+  }
+
+  if ($have_lua && !defined $self->{multi_hmget_script}) {
+    $self->_define_lua_scripts;
   }
 
   $self->{is_really_open} = 1;
@@ -469,7 +504,8 @@ sub get_storage_variables {
   my @tokens = map {"v:$_"}
                qw{LAST_JOURNAL_SYNC NSPAM NHAM NTOKENS LAST_EXPIRE
                   OLDEST_TOKEN_AGE DB_VERSION LAST_JOURNAL_SYNC
-                  LAST_ATIME_DELTA LAST_EXPIRE_REDUCE NEWEST_TOKEN_AGE};
+                  LAST_ATIME_DELTA LAST_EXPIRE_REDUCE NEWEST_TOKEN_AGE
+                  TOKEN_FORMAT};
   my $values = $self->_mget(\@tokens);
   return if !$values;
   return map(defined $_ ? $_ : 0, @$values);
@@ -518,7 +554,7 @@ and expire is currently running.
 sub remove_running_expire_tok {
   return 1;
 }
-  
+
 =head2 tok_get
 
 public instance (Integer, Integer, Integer) tok_get (String $token)
@@ -551,25 +587,41 @@ sub tok_get_all {
   my $self = shift;
 # my @keys = @_;  # avoid copying strings unnecessarily
 
-  my @t = map("t:$_", @_);
-  my $results = $self->_mget(\@t);
+  my @values;
+  my $r = $self->{redis};
 
-  $results or die "bayes: tok_get_all - no results from mget";
-  if (@$results != @t) {
-    warn sprintf("bayes: tok_get_all: asked for %d tokens, got %d\n",
-                 scalar @t, scalar @$results);
-    return;
-  }
+  if (! $self->{have_lua} ) {
+    foreach my $token (@_) {
+      $r->hmget('w:'.$token, 's', 'h', sub {
+        my($values, $error) = @_;
+        return if !$values || @$values != 2;
+        push(@values, [$token, $values->[0]||0, $values->[1]||0, 0]);
+        1;
+      });
+    }
+    $self->_wait_all_responses;
 
-  my $j = 0;
-  my @values;
-  foreach my $token (@_) {
-    my $value = $results->[$j++];
-    push(@values, [$token, _unpack_token($value), 0]) if defined $value;
+  } else {  # have Lua, faster
+    my @results;
+    eval {
+      @results = $r->evalsha($self->{multi_hmget_script}, scalar @_, @_);
+      1;
+    } or do {  # Lua script probably not cached, define again and re-try
+      $@ =~ /^\Q[evalsha] NOSCRIPT\E/ or die "bayes: Redis LUA error: $@\n";
+      $self->_define_lua_scripts;
+      @results = $r->evalsha($self->{multi_hmget_script}, scalar @_, @_);
+    };
+    @results == @_
+      or die sprintf("bayes: tok_get_all got %d results, expected %d\n",
+                     scalar @results, scalar @_);
+    foreach my $token (@_) {
+      my($s,$h) = split(m{/}, shift @results, 2);
+      push(@values, [$token, 0+$s, 0+$h, 0])  if $s || $h;
+    }
   }
 
   dbg("bayes: tok_get_all found %d tokens out of %d",
-      scalar @values, scalar @t);
+      scalar @values, scalar @_);
 
   return \@values;
 }
@@ -606,36 +658,67 @@ atime with C<$atime>.
 sub multi_tok_count_change {
   my($self, $dspam, $dham, $tokens, $newatime) = @_;
 
-  # Make sure we have some values
+  # turn undef or an empty string into a 0
   $dspam ||= 0;
-  $dham ||= 0;
+  $dham  ||= 0;
+  # the increment must be an integer, otherwise redis returns an error
 
-  my @t = map("t:$_", keys %{$tokens});
-  my $v = $self->_mget(\@t);
+  my $ttl = $self->{expire_token};  # time-to-live, in seconds
 
-  $v or die "bayes: multi_tok_count_change - no results from mget";
-  if (@$v != @t) {
-    warn sprintf("bayes: multi_tok_count_change: asked for %d tokens, got %d\n",
-                 scalar @t, scalar @$v);
-    return;
-  }
-
-  my $j = 0;
-  foreach my $token (@t) {
-    my $value = $v->[$j++];
-    my ($spam, $ham) = defined $value ? _unpack_token($value) : (0,0);
-    $spam += $dspam;
-    $ham += $dham;
-    $spam = 0 if $spam < 0;
-    $ham = 0 if $ham < 0;
-    if ($ham == 0 && $spam == 0) {
-      $self->_del_p($token);
-    } else {
-      $self->_set_p($token, _pack_token($spam, $ham), $self->{expire_token});
+  dbg("bayes: multi_tok_count_change learning %d spam, %d ham",
+      $dspam, $dham);
+
+  if ($self->{have_lua}) {
+
+    my $r = $self->{redis};
+    my $ntokens = scalar keys %$tokens;
+    my $cnt;
+    eval {
+      $cnt = $r->evalsha($self->{multi_hincrby},
+                         $ntokens, keys %$tokens, $dspam, $dham, $ttl);
+      1;
+    } or do {  # Lua script probably not cached, define again and re-try
+      $@ =~ /^\Q[evalsha] NOSCRIPT\E/ or die "bayes: Redis LUA error: $@\n";
+      $self->_define_lua_scripts;
+      $cnt = $r->evalsha($self->{multi_hincrby},
+                         $ntokens, keys %$tokens, $dspam, $dham, $ttl);
+    };
+    $cnt == $ntokens
+      or die sprintf("bayes: multi_tok_count_change got %d, expected %d\n",
+                     $cnt, $ntokens);
+
+  } else {  # no Lua, slower
+
+    if ($dspam > 0 || $dham > 0) {  # learning
+      while (my($token,$v) = each(%$tokens)) {
+        $self->_hincrby_p('w:'.$token, 's', $dspam)  if $dspam > 0;
+        $self->_hincrby_p('w:'.$token, 'h', $dham)   if $dham  > 0;
+        $self->_expire_p('w:'.$token, $ttl)  if $ttl;
+      }
+      $self->_wait_all_responses;
     }
-  }
 
-  $self->_wait_all_responses;
+    if ($dspam < 0 || $dham < 0) {  # unlearning - rare, not as efficient
+      while (my($token,$v) = each(%$tokens)) {
+        if ($dspam < 0) {
+          my $result = $self->_hincrby('w:'.$token, 's', int $dspam);
+          if (!$result || $result <= 0) {
+            $self->hdel('w:'.$token, 's');
+          } elsif ($ttl) {
+            $self->expire('w:'.$token, $ttl);
+          }
+        }
+        if ($dham < 0) {
+          my $result = $self->_hincrby('w:'.$token, 'h', int $dham);
+          if (!$result || $result <= 0) {
+            $self->hdel('w:'.$token, 'h');
+          } elsif ($ttl) {
+            $self->expire('w:'.$token, $ttl);
+          }
+        }
+      }
+    }
+  }
 
   return 1;
 }
@@ -722,15 +805,34 @@ if the existing token atime is < C<$atim
 sub tok_touch_all {
   my($self, $tokens, $newatime) = @_;
 
-  return 1 unless defined $self->{expire_token};
+  my $ttl = $self->{expire_token};  # time-to-live, in seconds
+  return 1 unless defined $ttl;
+
+  dbg("bayes: tok_touch_all setting expire to %s on %d tokens",
+      $ttl, scalar @$tokens);
 
   # We just refresh TTL on all
-  foreach (@$tokens) {
-    $self->_expire_p("t:$_", $self->{expire_token});
+  if (! $self->{have_lua} ) {
+    $self->_expire_p("w:$_", $ttl) for @$tokens;
+    $self->_wait_all_responses;
+
+  } else {  # have Lua, faster
+    my $r = $self->{redis};
+    my $cnt;
+    eval {
+      $cnt = $r->evalsha($self->{multi_expire_script},
+                         scalar @$tokens, @$tokens, $ttl);
+      1;
+    } or do {  # Lua script probably not cached, define again and re-try
+      $@ =~ /^\Q[evalsha] NOSCRIPT\E/ or die "bayes: Redis LUA error: $@\n";
+      $self->_define_lua_scripts;
+      $cnt = $r->evalsha($self->{multi_expire_script},
+                         scalar @$tokens, @$tokens, $ttl);
+    };
+    $cnt == @$tokens
+      or die sprintf("bayes: tok_touch_all got %d, expected %d\n",
+                     $cnt, scalar @$tokens);
   }
-
-  $self->_wait_all_responses;
-
   return 1;
 }
 
@@ -827,43 +929,60 @@ sub backup_database {
   print "v\t$vars[1]\tnum_spam\n";
   print "v\t$vars[2]\tnum_nonspam\n";
 
-  # Process tokens in chunks of 10000 to save some memory on large sets
-  # (sadly it's impossible to prevent Redis-module itself keeping all
-  # resulting keys in memory)
-
-  $self->{redis}->keys('t:*', sub {
-    my ($reply, $error) = @_;
-    die "bayes: token keys fetch failed: $error" if defined $error;
-    for (my $i = 0; $i < @$reply; $i += 10000) {
-      my $end = $i + 10000 > @$reply ? @$reply - 1 : $i + 9999;
-      my @t = @$reply[$i .. $end];
-      my $v = $self->_mget(\@t);
-      die "bayes: token fetch failed" unless $v && @$v;
-      for (my $i = 0; $i < @$v; $i++) {
-	next unless defined $v->[$i];
-        my($ts, $th) = _unpack_token($v->[$i]);
-        my $encoded = unpack("H*", substr($t[$i], 2));
-        printf("t\t%d\t%d\t%s\t%s\n", $ts, $th, $atime, $encoded);
+  my $r = $self->{redis};
+
+  # Sadly it's impossible to prevent Redis-module itself keeping all
+  # resulting keys in memory.
+  my @keys;
+
+  # let's get past this terrible command as fast as possible
+  @keys = $r->keys('w:*');
+  dbg("bayes: fetched %d token keys", scalar @keys);
+
+  # process tokens in chunks of 1000
+  for (my $i = 0; $i <= $#keys; $i += 1000) {
+    my $end = $i + 999 >= $#keys ? $#keys : $i + 999;
+
+    if ($self->{have_lua}) {
+      my @tokens = map(substr($_,2), @keys[$i .. $end]);  # strip leading "w:"
+      my @results = $r->evalsha($self->{multi_hmget_script},
+                                scalar @tokens, @tokens);
+      foreach my $token (@tokens) {
+        my($s,$h) = split(m{/}, shift @results, 2);
+        my $encoded = unpack("H*", $token);
+        printf("t\t%d\t%d\t%s\t%s\n", $s, $h, $atime, $encoded)  if $s || $h;
       }
-    }
-  });
-  $self->{redis}->wait_all_responses;
 
-  $self->{redis}->keys('s:*', sub {
-    my ($reply, $error) = @_;
-    die "bayes: seen keys fetch failed: $error" if defined $error;
-    for (my $i = 0; $i < @$reply; $i += 10000) {
-      my $end = $i + 10000 > @$reply ? @$reply - 1 : $i + 9999;
-      my @t = @$reply[$i .. $end];
-      my $v = $self->_mget(\@t);
-      die "bayes: seen fetch failed" unless $v && @$v;
-      for (my $i = 0; $i < @$v; $i++) {
-	next unless defined $v->[$i];
-        printf("s\t%s\t%s\n", $v->[$i], substr($t[$i], 2));
+    } else {   # no Lua, slower
+      for (my $j = $i; $j <= $end; $j++) {
+        my $token = $keys[$j];
+        $r->hmget($token, 's', 'h', sub {
+          my($values, $error) = @_;
+          return if !$values || @$values != 2;
+          return if !$values->[0] && !$values->[1];
+          my $encoded = unpack("H*", substr($token, 2));
+          printf("t\t%d\t%d\t%s\t%s\n",
+                 $values->[0]||0, $values->[1]||0, $atime, $encoded);
+          1;
+        });
       }
+      $self->_wait_all_responses;
     }
-  });
-  $self->{redis}->wait_all_responses;
+  }
+
+  @keys = $r->keys('s:*');
+  dbg("bayes: fetched %d seen keys", scalar @keys);
+
+  for (my $i = 0; $i <= $#keys; $i += 1000) {
+    my $end = $i + 999 >= $#keys ? $#keys : $i + 999;
+    my @t = @keys[$i .. $end];
+    my $v = $self->_mget(\@t);
+    die "bayes: seen fetch failed" unless $v && @$v;
+    for (my $i = 0; $i < @$v; $i++) {
+      next unless defined $v->[$i];
+      printf("s\t%s\t%s\n", $v->[$i], substr($t[$i], 2));
+    }
+  }
 
   $self->untie_db();
 
@@ -959,9 +1078,12 @@ sub restore_database {
         # turn unpacked binary token back into binary value
         $token = pack("H*",$token);
       }
+      my $key = 'w:'.$token;
+      $self->_hincrby_p($key, 's', int $spam_count) if $spam_count > 0;
+      $self->_hincrby_p($key, 'h', int $ham_count)  if $ham_count  > 0;
+      $self->_expire_p($key, $self->{expire_token})
+        if defined $self->{expire_token};
 
-      $self->_set_p("t:$token", _pack_token($spam_count, $ham_count),
-                    $self->{expire_token});
       $self->{redis}->wait_all_responses if ++$q_cnt % 10000 == 0;
       $token_count++;
     } elsif ($line =~ /^s\s+/) { # seen line
@@ -1056,41 +1178,64 @@ sub db_writable {
          $self->{is_writable};
 }
 
-# token marshalling format for tokens
-# pack CC for values <256, VV for the rest, keep it simple
-
-sub _unpack_token {
-  my $value = shift;
-
-  my ($ts, $th);
-
-  if (length($value) == 2) {
-    ($ts, $th) = unpack("CC", $value);
-  }
-  elsif (length($value) == 8) {
-    ($ts, $th) = unpack("VV", $value);
-  }
-  else {
-    dbg("bayes: unknown token format: ".unpack("H*", $value));
-  }
+#
+# Redis functions
+#
 
-  return ($ts||0, $th||0);
-}
+sub _define_lua_scripts {
+  my $self = shift;
+  dbg("bayes: defining Lua scripts");
+  my $r = $self->{redis};
 
-sub _pack_token {
-  my($ts, $th) = @_;
+  $self->{multi_hmget_script} = $r->script_load(<<'END');
+    local r = {}
+    for j = 1, #KEYS do
+      local sh = redis.call("hmget", "w:" .. KEYS[j], "s", "h")
+      -- returns counts as "spam/ham" pairs
+      table.insert(r, (sh[1] or 0) .. "/" .. (sh[2] or 0))
+    end
+    return r
+END
+
+  $self->{multi_expire_script} = $r->script_load(<<'END');
+    local ttl = ARGV[1]
+    for j = 1, #KEYS do
+      redis.call("expire", "w:" .. KEYS[j], ttl)
+    end
+    return #KEYS
+END
+
+  $self->{multi_hincrby} = $r->script_load(<<'END');
+    local s, h, ttl = ARGV[1], ARGV[2], ARGV[3]
+    local set_expire = ttl and tonumber(ttl) > 0
+    if s ~= 0 then
+      for j = 1, #KEYS do
+        local token = "w:" .. KEYS[j]
+        local cnt = redis.call("hincrby", token, "s", s)
+        if cnt <= 0 then
+          redis.call("hdel", token, "s")
+        elseif set_expire then
+          redis.call("expire", token, ttl)
+        end
+      end
+    end
+    if h ~= 0 then
+      for j = 1, #KEYS do
+        local token = "w:" .. KEYS[j]
+        local cnt = redis.call("hincrby", token, "h", h)
+        if cnt <= 0 then
+          redis.call("hdel", token, "h")
+        elseif set_expire then
+          redis.call("expire", token, ttl)
+        end
+      end
+    end
+    return #KEYS
+END
 
-  if ($ts < 256 && $th < 256) {
-    return pack("CC", $ts, $th);
-  } else {
-    return pack("VV", $ts, $th);
-  }
+  1;
 }
 
-#
-# Redis functions
-#
-
 sub _get {
   my ($self, $key) = @_;
 
@@ -1131,6 +1276,25 @@ sub _mget {
   return \@values;
 }
 
+sub _hmget {
+  my ($self, $key, @fields) = @_;
+
+  my $value;
+  my $err = $self->{timer}->run_and_catch(sub {
+    $value = $self->{redis}->hmget($key, @fields);
+  });
+
+  if ($self->{timer}->timed_out()) {
+    die("bayes: hmget timed out!");
+  }
+  elsif ($err) {
+    $err =~ s{ at /.*}{}s; # skip full trace
+    die("bayes: hmget failed: $err");
+  }
+
+  return $value;
+}
+
 sub _set {
   my ($self, $key, $value, $expire) = @_;
 
@@ -1153,6 +1317,24 @@ sub _set {
   return 1;
 }
 
+sub _hincrby {
+  my ($self, $key, $field, $incr) = @_;
+
+  my $err = $self->{timer}->run_and_catch(sub {
+    $self->{redis}->hincrby($key, $field, $incr);
+  });
+
+  if ($self->{timer}->timed_out()) {
+    die("bayes: hincrby timed out!");
+  }
+  elsif ($err) {
+    $err =~ s{ at /.*}{}s; # skip full trace
+    die("bayes: hincrby failed: $err");
+  }
+
+  return 1;
+}
+
 # Pipelined set, must call _wait_all_responses after
 sub _set_p {
   my ($self, $key, $value, $expire) = @_;
@@ -1166,6 +1348,15 @@ sub _set_p {
   return 1;
 }
 
+# Pipelined hincrby, must call _wait_all_responses after
+sub _hincrby_p {
+  my ($self, $key, $field, $incr) = @_;
+
+  $self->{redis}->hincrby($key, $field, $incr, sub {});
+
+  return 1;
+}
+
 # Pipelined del, must call _wait_all_responses after
 sub _del_p {
   my ($self, $key) = @_;
@@ -1206,24 +1397,6 @@ sub _wait_all_responses {
   return 1;
 }
 
-sub _mset {
-  my ($self, $values) = @_;
-
-  my $err = $self->{timer}->run_and_catch(sub {
-    $self->{redis}->mset(@$values);
-  });
-
-  if ($self->{timer}->timed_out()) {
-    die("bayes: mset timed out!");
-  }
-  elsif ($err) {
-    $err =~ s{ at /.*}{}s; # skip full trace
-    die("bayes: mset failed: $err");
-  }
-
-  return 1;
-}
-
 sub _del {
   my ($self, $key) = @_;
 

Modified: spamassassin/trunk/lib/Mail/SpamAssassin/Util/DependencyInfo.pm
URL: http://svn.apache.org/viewvc/spamassassin/trunk/lib/Mail/SpamAssassin/Util/DependencyInfo.pm?rev=1494137&r1=1494136&r2=1494137&view=diff
==============================================================================
--- spamassassin/trunk/lib/Mail/SpamAssassin/Util/DependencyInfo.pm (original)
+++ spamassassin/trunk/lib/Mail/SpamAssassin/Util/DependencyInfo.pm Tue Jun 18 13:55:21 2013
@@ -217,6 +217,12 @@ $have_sha ? {
   your database.',
 },
 {
+  module => 'Redis',
+  version => 1.954,
+  desc => 'If you intend to use SpamAssassin with a Redis database backend for
+  Bayes storage, you will need to have this module installed.',
+},
+{
   module => 'Getopt::Long',
   version => '2.32',        # min version was included in 5.8.0, which works
   desc => 'The "sa-stats.pl" program included in "tools", used to generate