You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spamassassin.apache.org by mm...@apache.org on 2013/05/31 16:17:49 UTC

svn commit: r1488244 - in /spamassassin/trunk/lib/Mail/SpamAssassin: BayesStore.pm BayesStore/Redis.pm Conf.pm Plugin/Bayes.pm

Author: mmartinec
Date: Fri May 31 14:17:49 2013
New Revision: 1488244

URL: http://svn.apache.org/r1488244
Log:
Bug 6942: Redis bayes storage module fixes and updates

Modified:
    spamassassin/trunk/lib/Mail/SpamAssassin/BayesStore.pm
    spamassassin/trunk/lib/Mail/SpamAssassin/BayesStore/Redis.pm
    spamassassin/trunk/lib/Mail/SpamAssassin/Conf.pm
    spamassassin/trunk/lib/Mail/SpamAssassin/Plugin/Bayes.pm

Modified: spamassassin/trunk/lib/Mail/SpamAssassin/BayesStore.pm
URL: http://svn.apache.org/viewvc/spamassassin/trunk/lib/Mail/SpamAssassin/BayesStore.pm?rev=1488244&r1=1488243&r2=1488244&view=diff
==============================================================================
--- spamassassin/trunk/lib/Mail/SpamAssassin/BayesStore.pm (original)
+++ spamassassin/trunk/lib/Mail/SpamAssassin/BayesStore.pm Fri May 31 14:17:49 2013
@@ -117,6 +117,33 @@ sub read_db_configs {
   $self->{bayes}->read_db_configs();
 }
 
+=item prefork_init
+
+public instance (Boolean) prefork_init ()
+
+Description:
+This optional method is called in the parent process shortly before
+forking off child processes.
+
+=cut
+
+# sub prefork_init {
+#   my ($self) = @_;
+# }
+
+=item spamd_child_init
+
+public instance (Boolean) spamd_child_init ()
+
+Description:
+This optional method is called in a child process shortly after being spawned.
+
+=cut
+
+# sub spamd_child_init {
+#   my ($self) = @_;
+# }
+
 =item tie_db_readonly
 
 public instance (Boolean) tie_db_readonly ()

Modified: spamassassin/trunk/lib/Mail/SpamAssassin/BayesStore/Redis.pm
URL: http://svn.apache.org/viewvc/spamassassin/trunk/lib/Mail/SpamAssassin/BayesStore/Redis.pm?rev=1488244&r1=1488243&r2=1488244&view=diff
==============================================================================
--- spamassassin/trunk/lib/Mail/SpamAssassin/BayesStore/Redis.pm (original)
+++ spamassassin/trunk/lib/Mail/SpamAssassin/BayesStore/Redis.pm Fri May 31 14:17:49 2013
@@ -26,7 +26,7 @@ Mail::SpamAssassin::BayesStore::Redis - 
 This module implementes a Redis based bayesian storage module.
 !! IT IS STILL EXPERIMENTAL AND SUBJECT TO CHANGE !!
 
-These config variables have been hijacked for our purposes:
+The following config variable has been hijacked for our purposes:
 
   bayes_sql_dsn
 
@@ -37,15 +37,22 @@ These config variables have been hijacke
     To use non-default database id, use "database=x". This is not passed
     to new(), but specially handled to call Redis->select($id).
 
-  bayes_expiry_max_db_size
+  bayes_token_ttl
 
-    Controls token/seen expiry (ttl value in SECONDS, sent as is to Redis).
-    Default 150000 (41 hours) is sane (that's why we abuse this variable),
-    but you should try atleast 604800 (1 week).
-
-Expiry is done internally in Redis using EXPIRY value mentioned above. This
-is why --force-expire etc does nothing and token counts and atime values are
-shown zero in statistics.
+    Controls token expiry (ttl value in SECONDS, sent as is to Redis)
+    when bayes_auto_expire is true. Default value is 3 weeks (but check
+    Mail::SpamAssassin::Conf.pm to make sure).
+
+  bayes_seen_ttl
+
+    Controls 'seen' expiry (ttl value in SECONDS, sent as is to Redis)
+    when bayes_auto_expire is true. Default value is 8 days (but check
+    Mail::SpamAssassin::Conf.pm to make sure).
+
+Expiry is done internally in Redis using _ttl values mentioned above,
+but only if bayes_auto_expire is true (which is a default). This is
+why --force-expire etc does nothing and token counts and atime values
+are shown zero in statistics.
 
 =cut
 
@@ -95,12 +102,13 @@ sub new {
   my $self = $class->SUPER::new(@_);
 
   unless (HAS_REDIS) {
-    dbg("bayes: unable to connect to database: DBI module not available: $!");
+    dbg("bayes: unable to connect to database: Redis module not available");
   }
 
+  my $bconf = $self->{bayes}->{conf};
   push @{$self->{redis_conf}}, 'encoding' => undef;
 
-  foreach (split(';', $self->{bayes}->{conf}->{bayes_sql_dsn})) {
+  foreach (split(';', $bconf->{bayes_sql_dsn})) {
     my ($a, $b) = split('=');
     unless (defined $b) {
       warn("bayes: invalid bayes_sql_dsn config\n");
@@ -115,10 +123,11 @@ sub new {
     }
   }
 
-  $self->{expire_seen} =
-    $self->{bayes}->{conf}->{bayes_expiry_max_db_size} || 150000;
-  $self->{expire_token} =
-    $self->{bayes}->{conf}->{bayes_expiry_max_db_size} || 150000;
+  $self->{expire_token} = $bconf->{bayes_token_ttl};
+  $self->{expire_seen}  = $bconf->{bayes_seen_ttl};
+  if (!$bconf->{bayes_auto_expire}) {
+    $self->{expire_token} = $self->{expire_seen} = undef;
+  }
 
   $self->{supported_db_version} = 3;
   $self->{is_really_open} = 0;
@@ -132,6 +141,80 @@ sub new {
   return $self;
 }
 
+sub DESTROY {
+  my($self) = @_;
+  if ($self->{is_really_open} && $self->{redis}) {
+    eval { $self->{redis}->quit };  # close session, ignoring any failures
+  }
+}
+
+=head2 prefork_init
+
+public instance (Boolean) prefork_init ();
+
+Description:
+This optional method is called in the parent process shortly before
+forking off child processes.
+
+=cut
+
+sub prefork_init {
+  my ($self) = @_;
+
+  HAS_REDIS or return;
+
+  # Each child process must establish its own connection with a Redis server,
+  # re-using a common forked socket leads to serious trouble (garbled data).
+  #
+  # Parent process may have established its connection during startup, but
+  # it is no longer of any use by now, so we shut it down here in the master
+  # process, letting a spawned child process re-establish it later.
+
+  if ($self->{is_really_open}) {
+    dbg("bayes: prefork_init, closing a session ".
+        "with a Redis server in a parent process");
+    $self->untie_db;
+    if ($self->{redis}) {
+      eval { $self->{redis}->quit };  # close session, ignoring any failures
+    }
+    undef $self->{redis};
+    $self->{is_really_open} = 0;
+  }
+}
+
+=head2 spamd_child_init
+
+public instance (Boolean) spamd_child_init ();
+
+Description:
+This optional method is called in a child process shortly after being spawned.
+
+=cut
+
+sub spamd_child_init {
+  my ($self) = @_;
+
+  HAS_REDIS or return;
+
+  # Each child process must establish its own connection with a Redis server,
+  # re-using a common forked socket leads to serious trouble (garbled data).
+  #
+  # Just in case the parent master process did not call prefork_init() above,
+  # we try to silently renounce the use of existing cloned connection here.
+  # As the prefork_init plugin callback has only been in introduced in
+  # SpamAssassin 3.4.0, this situation can arrise in case of some third party
+  # software (or a pre-3.4.0 version of spamd) is somehow using this plugin.
+  # Better safe than sorry...
+
+  if ($self->{is_really_open}) {
+    dbg("bayes: spamd_child_init, closing a parent's session ".
+        "with a Redis server in a child process");
+    $self->untie_db;
+    undef $self->{redis};  # just drop it, don't shut down parent's session
+    $self->{is_really_open} = 0;
+  }
+}
+
 =head2 tie_db_readonly
 
 public instance (Boolean) tie_db_readonly ();
@@ -147,10 +230,15 @@ sub tie_db_readonly {
 
   return 0 unless (HAS_REDIS);
 
-  my $result = $self->{is_really_open} || $self->_open_db();
-  $self->{is_writable} = 0 if $result;
+  my $really_open = $self->{is_really_open};
+  if ($really_open) {
+    $self->{is_officially_open} = 1;
+  } else {
+    $really_open = $self->_open_db();
+  }
+  $self->{is_writable} = 0;
 
-  return $result;
+  return $really_open;
 }
 
 =head2 tie_db_writable
@@ -169,10 +257,16 @@ sub tie_db_writable {
 
   return 0 unless (HAS_REDIS);
 
-  my $result = $self->{is_really_open} || $self->_open_db();
-  $self->{is_writable} = 1 if $result;
+  my $really_open = $self->{is_really_open};
+  if ($really_open) {
+    $self->{is_officially_open} = 1;
+  } else {
+    $really_open = $self->_open_db();
+  }
+
+  $self->{is_writable} = 1 if $really_open;
 
-  return $result;
+  return $really_open;
 }
 
 =head2 _open_db
@@ -194,13 +288,15 @@ sub _open_db {
       Redis->VERSION);
 
   if ($self->{is_really_open}) {
-      $self->{is_officially_open} = 1;
-      return 1;
+    $self->{is_officially_open} = 1;
+    return 1;
   }
 
   $self->read_db_configs();
 
   my $err = $self->{timer}->run_and_catch(sub {
+    $self->{opened_from_pid} = $$;
+    # will keep a persistent session open to a redis server
     $self->{redis} = Redis->new(@{$self->{redis_conf}});
     $self->{redis}->select($self->{db_id}) if defined $self->{db_id};
   });
@@ -210,7 +306,7 @@ sub _open_db {
     return 0;
   }
   elsif ($err) {
-    $err =~ s! at /.*!!s; # skip full trace
+    $err =~ s{ at /.*}{}s; # skip full trace
     warn("bayes: Redis connection failed: $err");
     return 0;
   }
@@ -374,12 +470,9 @@ sub get_storage_variables {
                qw{LAST_JOURNAL_SYNC NSPAM NHAM NTOKENS LAST_EXPIRE
                   OLDEST_TOKEN_AGE DB_VERSION LAST_JOURNAL_SYNC
                   LAST_ATIME_DELTA LAST_EXPIRE_REDUCE NEWEST_TOKEN_AGE};
-  my @values = $self->_mget(\@tokens);
-  foreach (@values) {
-    $_ = 0 unless $_;
-  }
-
-  return @values;
+  my $values = $self->_mget(\@tokens);
+  return if !$values;
+  return map(defined $_ ? $_ : 0, @$values);
 }
 
 =head2 get_running_expire_tok
@@ -440,7 +533,8 @@ sub tok_get {
   my($self, $token) = @_;
 
   my $array = $self->tok_get_all($token);
-  return !@$array ? () : (@{$array->[0]})[1,2,3];
+  return if !$array || !@$array;
+  return (@{$array->[0]})[1,2,3];
 }
 
 =head2 tok_get_all
@@ -449,24 +543,33 @@ public instance (\@) tok_get (@ $tokens)
 
 Description:
 This method retrieves the specified tokens (C<$tokens>) from storage and
-returns an array ref of arrays spam count, ham acount and last access time.
+returns a ref to arrays spam count, ham count and last access time.
 
 =cut
 
 sub tok_get_all {
-  my($self, @keys) = @_;
+  my $self = shift;
+# my @keys = @_;  # avoid copying strings unnecessarily
 
-  my @t = map {"t:$_"} @keys;
-  my @results = $self->_mget(\@t);
-  my @values;
+  my @t = map("t:$_", @_);
+  my $results = $self->_mget(\@t);
 
-  foreach my $token (@keys) {
-    my $value = shift(@results);
+  $results or die "bayes: tok_get_all - no results from mget";
+  if (@$results != @t) {
+    warn sprintf("bayes: tok_get_all: asked for %d tokens, got %d\n",
+                 scalar @t, scalar @$results);
+    return;
+  }
+
+  my $j = 0;
+  my @values;
+  foreach my $token (@_) {
+    my $value = $results->[$j++];
     push(@values, [$token, _unpack_token($value), 0]) if defined $value;
   }
 
-  dbg("bayes: tok_get_all found %d tokens out of %d search keys",
-      scalar(@values), scalar(@keys));
+  dbg("bayes: tok_get_all found %d tokens out of %d",
+      scalar @values, scalar @t);
 
   return \@values;
 }
@@ -495,7 +598,7 @@ public instance (Boolean) multi_tok_coun
 
 Description:
 This method takes a C<$dspam> and C<$dham> and adds it to all of the
-tokens in the C<$tokens> hash ref along with updating each tokens
+tokens in the C<$tokens> hash ref along with updating each token's
 atime with C<$atime>.
 
 =cut
@@ -507,11 +610,19 @@ sub multi_tok_count_change {
   $dspam ||= 0;
   $dham ||= 0;
 
-  my @t = map {"t:$_"} keys %{$tokens};
-  my @v = $self->_mget(\@t);
+  my @t = map("t:$_", keys %{$tokens});
+  my $v = $self->_mget(\@t);
 
+  $v or die "bayes: multi_tok_count_change - no results from mget";
+  if (@$v != @t) {
+    warn sprintf("bayes: multi_tok_count_change: asked for %d tokens, got %d\n",
+                 scalar @t, scalar @$v);
+    return;
+  }
+
+  my $j = 0;
   foreach my $token (@t) {
-    my $value = shift(@v);
+    my $value = $v->[$j++];
     my ($spam, $ham) = defined $value ? _unpack_token($value) : (0,0);
     $spam += $dspam;
     $ham += $dham;
@@ -570,7 +681,7 @@ sub nspam_nham_change {
     die("bayes: Redis connection timed out!");
   }
   elsif ($err) {
-    $err =~ s! at /.*!!s; # skip full trace
+    $err =~ s{ at /.*}{}s; # skip full trace
     die("bayes: failed to increment nspam $ds nham $dh: $err");
   }
 
@@ -612,8 +723,8 @@ sub tok_touch_all {
   my($self, $tokens, $newatime) = @_;
 
   # We just refresh TTL on all
-  foreach (map {"t:$_"} @$tokens) {
-    $self->_expire_p($_, $self->{expire_token});
+  foreach (@$tokens) {
+    $self->_expire_p("t:$_", $self->{expire_token});
   }
 
   $self->_wait_all_responses;
@@ -724,13 +835,13 @@ sub backup_database {
     for (my $i = 0; $i < @$reply; $i += 10000) {
       my $end = $i + 10000 > @$reply ? @$reply - 1 : $i + 9999;
       my @t = @$reply[$i .. $end];
-      my @v = $self->_mget(\@t);
-      die "bayes: token fetch failed" unless @v;
-      for (my $i = 0; $i < @v; $i++) {
-	next unless defined $v[$i];
-        my($ts, $th) = _unpack_token($v[$i]);
+      my $v = $self->_mget(\@t);
+      die "bayes: token fetch failed" unless $v && @$v;
+      for (my $i = 0; $i < @$v; $i++) {
+	next unless defined $v->[$i];
+        my($ts, $th) = _unpack_token($v->[$i]);
         my $encoded = unpack("H*", substr($t[$i], 2));
-        print "t\t$ts\t$th\t$atime\t$encoded\n";
+        printf("t\t%d\t%d\t%s\t%s\n", $ts, $th, $atime, $encoded);
       }
     }
   });
@@ -742,11 +853,11 @@ sub backup_database {
     for (my $i = 0; $i < @$reply; $i += 10000) {
       my $end = $i + 10000 > @$reply ? @$reply - 1 : $i + 9999;
       my @t = @$reply[$i .. $end];
-      my @v = $self->_mget(\@t);
-      die "bayes: seen fetch failed" unless @v;
-      for (my $i = 0; $i < @v; $i++) {
-	next unless defined $v[$i];
-        print "s\t$v[$i]\t".substr($t[$i], 2)."\n";
+      my $v = $self->_mget(\@t);
+      die "bayes: seen fetch failed" unless $v && @$v;
+      for (my $i = 0; $i < @$v; $i++) {
+	next unless defined $v->[$i];
+        printf("s\t%s\t%s\n", $v->[$i], substr($t[$i], 2));
       }
     }
   });
@@ -940,7 +1051,7 @@ sub db_writable {
   my($self) = @_;
 
   return $self->{is_really_open} && $self->{is_officially_open} &&
-           $self->{is_writable};
+         $self->{is_writable};
 }
 
 # token marshalling format for tokens
@@ -991,7 +1102,7 @@ sub _get {
     die("bayes: get timed out!");
   }
   elsif ($err) {
-    $err =~ s! at /.*!!s; # skip full trace
+    $err =~ s{ at /.*}{}s; # skip full trace
     die("bayes: get failed: $err");
   }
 
@@ -1011,11 +1122,11 @@ sub _mget {
     die("bayes: mget timed out!");
   }
   elsif ($err) {
-    $err =~ s! at /.*!!s; # skip full trace
+    $err =~ s{ at /.*}{}s; # skip full trace
     die("bayes: mget failed: $err");
   }
 
-  return @values;
+  return \@values;
 }
 
 sub _set {
@@ -1033,7 +1144,7 @@ sub _set {
     die("bayes: set timed out!");
   }
   elsif ($err) {
-    $err =~ s! at /.*!!s; # skip full trace
+    $err =~ s{ at /.*}{}s; # skip full trace
     die("bayes: set failed: $err");
   }
 
@@ -1079,11 +1190,13 @@ sub _wait_all_responses {
   });
 
   if ($self->{timer}->timed_out()) {
-    die("bayes: wait_all_responses timed out!");
+    die sprintf("bayes: wait_all_responses timed out! called from line %s\n",
+                (caller)[2]);
   }
   elsif ($err) {
-    $err =~ s! at /.*!!s; # skip full trace
-    die("bayes: wait_all_responses failed: $err");
+    $err =~ s{ at /.*}{}s; # skip full trace
+    die sprintf("bayes: wait_all_responses failed: %s, called from line %s\n",
+                $err, (caller)[2]);
   }
 
   return 1;
@@ -1100,7 +1213,7 @@ sub _mset {
     die("bayes: mset timed out!");
   }
   elsif ($err) {
-    $err =~ s! at /.*!!s; # skip full trace
+    $err =~ s{ at /.*}{}s; # skip full trace
     die("bayes: mset failed: $err");
   }
 
@@ -1118,7 +1231,7 @@ sub _del {
     die("bayes: del timed out!");
   }
   elsif ($err) {
-    $err =~ s! at /.*!!s; # skip full trace
+    $err =~ s{ at /.*}{}s; # skip full trace
     die("bayes: mset failed: $err");
   }
 

Modified: spamassassin/trunk/lib/Mail/SpamAssassin/Conf.pm
URL: http://svn.apache.org/viewvc/spamassassin/trunk/lib/Mail/SpamAssassin/Conf.pm?rev=1488244&r1=1488243&r2=1488244&view=diff
==============================================================================
--- spamassassin/trunk/lib/Mail/SpamAssassin/Conf.pm (original)
+++ spamassassin/trunk/lib/Mail/SpamAssassin/Conf.pm Fri May 31 14:17:49 2013
@@ -2089,7 +2089,9 @@ equivalent to a 8Mb database file.
 
 If enabled, the Bayes system will try to automatically expire old tokens
 from the database.  Auto-expiry occurs when the number of tokens in the
-database surpasses the bayes_expiry_max_db_size value.
+database surpasses the bayes_expiry_max_db_size value. If a bayes datastore
+backend does not implement individual key/value expirations, the setting
+is silently ignored.
 
 =cut
 
@@ -2099,6 +2101,39 @@ database surpasses the bayes_expiry_max_
     type => $CONF_TYPE_BOOL,
   });
 
+=item bayes_token_ttl       		(default: 1814400, i.e. 3 weeks)
+
+If bayes_auto_expire is true and a Bayes datastore backend supports it
+(currently only Redis), this is a time-to-live / expiration time in seconds
+(since the last time they were touched) for tokens kept in a Bayes database.
+The value is observed on a best-effort basis, exact timing promises are not
+necessarily kept. If a bayes datastore backend does not implement individual
+key/value expirations, the setting is silently ignored.
+
+=cut
+
+  push (@cmds, {
+    setting => 'bayes_token_ttl',
+    default => 3*7*24*60*60,  # seconds
+    type => $CONF_TYPE_NUMERIC,
+  });
+
+=item bayes_seen_ttl       		(default: 691200, i.e. 8 days)
+
+If bayes_auto_expire is true and a Bayes datastore backend supports it
+(currently only Redis), this is a time-to-live / expiration time in seconds
+for 'seen' entries (i.e. mail message digests with their status) kept in a
+Bayes database. The value is observed on a best-effort basis, exact timing
+promises are not necessarily kept.
+
+=cut
+
+  push (@cmds, {
+    setting => 'bayes_token_ttl',
+    default => 8*24*60*60,  # seconds
+    type => $CONF_TYPE_NUMERIC,
+  });
+
 =item bayes_learn_to_journal  	(default: 0)
 
 If this option is set, whenever SpamAssassin does Bayes learning, it

Modified: spamassassin/trunk/lib/Mail/SpamAssassin/Plugin/Bayes.pm
URL: http://svn.apache.org/viewvc/spamassassin/trunk/lib/Mail/SpamAssassin/Plugin/Bayes.pm?rev=1488244&r1=1488243&r2=1488244&view=diff
==============================================================================
--- spamassassin/trunk/lib/Mail/SpamAssassin/Plugin/Bayes.pm (original)
+++ spamassassin/trunk/lib/Mail/SpamAssassin/Plugin/Bayes.pm Fri May 31 14:17:49 2013
@@ -249,6 +249,8 @@ sub finish {
   %{$self} = ();
 }
 
+###########################################################################
+
 # Plugin hook.
 # Return this implementation object, for callers that need to know
 # it.  TODO: callers shouldn't *need* to know it! 
@@ -258,6 +260,31 @@ sub learner_get_implementation { return 
 
 ###########################################################################
 
+# Plugin hook.
+# Called in the parent process shortly before forking off child processes.
+sub prefork_init {
+  my ($self) = @_;
+
+  if ($self->{store} && $self->{store}->UNIVERSAL::can('prefork_init')) {
+    $self->{store}->prefork_init;
+  }
+}
+
+###########################################################################
+
+# Plugin hook.
+# Called in a child process shortly after being spawned.
+sub spamd_child_init {
+  my ($self) = @_;
+
+  if ($self->{store} && $self->{store}->UNIVERSAL::can('spamd_child_init')) {
+    $self->{store}->spamd_child_init;
+  }
+}
+
+###########################################################################
+
+# Plugin hook.
 sub check_bayes {
   my ($self, $pms, $fulltext, $min, $max) = @_;
 
@@ -349,6 +376,7 @@ sub learn_message {
 
   eval {
     local $SIG{'__DIE__'};	# do not run user die() traps in here
+    my $timer = $self->{main}->time_method("b_learn");
 
     my $ok;
     if ($self->{main}->{learn_to_journal}) {
@@ -484,6 +512,7 @@ sub forget_message {
   # synchronously
   eval {
     local $SIG{'__DIE__'};	# do not run user die() traps in here
+    my $timer = $self->{main}->time_method("b_learn");
 
     my $ok;
     if ($self->{main}->{learn_to_journal}) {