You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spamassassin.apache.org by he...@apache.org on 2012/12/14 10:56:17 UTC

svn commit: r1421774 - /spamassassin/trunk/lib/Mail/SpamAssassin/BayesStore/Redis.pm

Author: hege
Date: Fri Dec 14 09:56:17 2012
New Revision: 1421774

URL: http://svn.apache.org/viewvc?rev=1421774&view=rev
Log:
Bug 6879: Bayes storage module for Redis

Added:
    spamassassin/trunk/lib/Mail/SpamAssassin/BayesStore/Redis.pm

Added: spamassassin/trunk/lib/Mail/SpamAssassin/BayesStore/Redis.pm
URL: http://svn.apache.org/viewvc/spamassassin/trunk/lib/Mail/SpamAssassin/BayesStore/Redis.pm?rev=1421774&view=auto
==============================================================================
--- spamassassin/trunk/lib/Mail/SpamAssassin/BayesStore/Redis.pm (added)
+++ spamassassin/trunk/lib/Mail/SpamAssassin/BayesStore/Redis.pm Fri Dec 14 09:56:17 2012
@@ -0,0 +1,1119 @@
+# <@LICENSE>
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to you under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at:
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# </...@LICENSE>
+
+=head1 NAME
+
+Mail::SpamAssassin::BayesStore::Redis - Redis Bayesian Storage Module Implementation
+
+=head1 SYNOPSIS
+
+=head1 DESCRIPTION
+
+This module implementes a Redis based bayesian storage module.
+!! IT IS STILL EXPERIMENTAL AND SUBJECT TO CHANGE !!
+
+These config variables have been hijacked for our purposes:
+
+  bayes_sql_dsn
+
+    Optional config parameters sent as is to Redis->new().
+    Example: server=localhost:6379;password=foo
+    By default encoding=undef is set as suggested by Redis module.
+
+    To use non-default database id, use "database=x". This is not passed
+    to new(), but specially handled to call Redis->select($id).
+
+  bayes_expiry_max_db_size
+
+    Controls token/seen expiry (ttl value in SECONDS, sent as is to Redis).
+    Default 150000 (41 hours) is sane (that's why we abuse this variable),
+    but you should try atleast 604800 (1 week).
+
+Expiry is done internally in Redis using EXPIRY value mentioned above. This
+is why --force-expire etc does nothing and token counts and atime values are
+shown zero in statistics.
+
+=cut
+
+package Mail::SpamAssassin::BayesStore::Redis;
+my $VERSION = 0.09;
+
+use strict;
+use warnings;
+use bytes;
+use re 'taint';
+use Errno qw(EBADF);
+use Mail::SpamAssassin::Util qw(untaint_var);
+use Mail::SpamAssassin::Timeout;
+use Redis;
+
+BEGIN {
+  eval { require Digest::SHA; import Digest::SHA qw(sha1); 1 }
+  or do { require Digest::SHA1; import Digest::SHA1 qw(sha1) }
+}
+
+use Mail::SpamAssassin::BayesStore;
+use Mail::SpamAssassin::Logger;
+
+use vars qw( @ISA );
+
+@ISA = qw( Mail::SpamAssassin::BayesStore );
+
+=head1 METHODS
+
+=head2 new
+
+public class (Mail::SpamAssassin::BayesStore::Redis) new (Mail::Spamassassin::Plugin::Bayes $bayes)
+
+Description:
+This methods creates a new instance of the Mail::SpamAssassin::BayesStore::Redis
+object.  It expects to be passed an instance of the Mail::SpamAssassin:Bayes
+object which is passed into the Mail::SpamAssassin::BayesStore parent object.
+
+=cut
+
+sub new {
+  my $class = shift;
+  $class = ref($class) || $class;
+  my $self = $class->SUPER::new(@_);
+
+  push @{$self->{redis_conf}}, 'encoding' => undef;
+
+  foreach (split(';', $self->{bayes}->{conf}->{bayes_sql_dsn})) {
+    my ($a, $b) = split('=');
+    unless (defined $b) {
+      warn("bayes: invalid bayes_sql_dsn config\n");
+      return;
+    }
+    if ($a eq 'database') {
+      $self->{db_id} = $b;
+    }
+    else {
+      push @{$self->{redis_conf}}, $a => $b eq 'undef' ?
+        undef : untaint_var($b);
+    }
+  }
+
+  $self->{expire_seen} =
+    $self->{bayes}->{conf}->{bayes_expiry_max_db_size} || 150000;
+  $self->{expire_token} =
+    $self->{bayes}->{conf}->{bayes_expiry_max_db_size} || 150000;
+
+  $self->{supported_db_version} = 3;
+  $self->{is_really_open} = 0;
+  $self->{is_writable} = 0;
+  $self->{is_officially_open} = 0;
+
+  $self->{timer} = Mail::SpamAssassin::Timeout->new({
+    secs => $self->{conf}->{redis_timeout} || 2
+  });
+
+  return $self;
+}
+
+=head2 tie_db_readonly
+
+public instance (Boolean) tie_db_readonly ();
+
+Description:
+This method ensures that the database connection is properly setup and
+working.
+
+=cut
+
+sub tie_db_readonly {
+  my($self) = @_;
+
+  my $result = $self->{is_really_open} || $self->_open_db();
+  $self->{is_writable} = 0 if $result;
+
+  return $result;
+}
+
+=head2 tie_db_writable
+
+public instance (Boolean) tie_db_writable ()
+
+Description:
+This method ensures that the database connection is properly setup and
+working. If necessary it will initialize the database so that they can
+begin using the database immediately.
+
+=cut
+
+sub tie_db_writable {
+  my($self) = @_;
+
+  my $result = $self->{is_really_open} || $self->_open_db();
+  $self->{is_writable} = 1 if $result;
+
+  return $result;
+}
+
+=head2 _open_db
+
+private instance (Boolean) _open_db (Boolean $writable)
+
+Description:
+This method ensures that the database connection is properly setup and
+working.  It will initialize a users bayes variables so that they
+can begin using the database immediately.
+
+=cut
+
+sub _open_db {
+  my($self) = @_;
+
+  dbg("bayes: _open_db(%s); Redis %s",
+      $self->{is_really_open} ? 'already open' : 'not yet open',
+      Redis->VERSION);
+
+  if ($self->{is_really_open}) {
+      $self->{is_officially_open} = 1;
+      return 1;
+  }
+
+  $self->read_db_configs();
+
+  my $err = $self->{timer}->run_and_catch(sub {
+    $self->{redis} = Redis->new(@{$self->{redis_conf}});
+    $self->{redis}->select($self->{db_id}) if defined $self->{db_id};
+  });
+
+  if ($self->{timer}->timed_out()) {
+    warn("bayes: Redis connection timed out!");
+    return 0;
+  }
+  elsif ($err) {
+    $err =~ s! at /.*!!s; # skip full trace
+    warn("bayes: Redis connection failed: $err");
+    return 0;
+  }
+
+  $self->{db_version} = $self->_get('v:DB_VERSION');
+
+  if (!$self->{db_version}) {
+      $self->{db_version} = $self->DB_VERSION;
+      my $ret = $self->_mset([
+        'v:DB_VERSION', $self->{db_version},
+        'v:NSPAM', 0,
+        'v:NHAM', 0,
+      ]);
+      unless ($ret) {
+          warn("bayes: failed to initialize database");
+          return 0;
+      }
+      dbg("bayes: initialized empty database, version $self->{db_version}");
+  }
+  else {
+    dbg("bayes: found bayes db version $self->{db_version}");
+    if ($self->{db_version} ne $self->DB_VERSION) {
+      warn("bayes: bayes db version $self->{db_version} not supported, aborting\n");
+      return 0;
+    }
+  }
+
+  $self->{is_really_open} = 1;
+  $self->{is_officially_open} = 1;
+
+  return 1;
+}
+
+=head2 untie_db
+
+public instance () untie_db ()
+
+Description:
+Closes any open db handles.  You can safely call this at any time.
+
+=cut
+
+sub untie_db {
+  my $self = shift;
+
+  $self->{is_officially_open} = 0;
+  $self->{is_writable} = 0;
+  return;
+}
+
+=head2 sync_due
+
+public instance (Boolean) sync_due ()
+
+Description:
+This method determines if a database sync is currently required.
+
+Unused for Redis implementation.
+
+=cut
+
+sub sync_due {
+  return 0;
+}
+
+=head2 expiry_due
+
+public instance (Boolean) expiry_due ()
+
+Description:
+This methods determines if an expire is due.
+
+Unused for Redis implementation.
+
+=cut
+
+sub expiry_due {
+  return 0;
+}
+
+=head2 seen_get
+
+public instance (String) seen_get (string $msgid)
+
+Description:
+This method retrieves the stored value, if any, for C<$msgid>.  The return
+value is the stored string ('s' for spam and 'h' for ham) or undef if C<$msgid>
+is not found.
+
+=cut
+
+sub seen_get {
+  my($self, $msgid) = @_;
+
+  return $self->_get("s:$msgid");
+}
+
+=head2 seen_put
+
+public (Boolean) seen_put (string $msgid, char $flag)
+
+Description:
+This method records C<$msgid> as the type given by C<$flag>.  C<$flag> is one
+of two values 's' for spam and 'h' for ham.
+
+=cut
+
+sub seen_put {
+  my($self, $msgid, $flag) = @_;
+
+  $self->_set("s:$msgid", $flag, $self->{expire_seen});
+  return 1;
+}
+
+=head2 seen_delete
+
+public instance (Boolean) seen_delete (string $msgid)
+
+Description:
+This method removes C<$msgid> from the database.
+
+=cut
+
+sub seen_delete {
+  my($self, $msgid) = @_;
+
+  $self->_del("s:$msgid");
+  return 1;
+}
+
+=head2 get_storage_variables
+
+public instance (@) get_storage_variables ()
+
+Description:
+This method retrieves the various administrative variables used by
+the Bayes process and database.
+
+The values returned in the array are in the following order:
+
+0: scan count base
+1: number of spam
+2: number of ham
+3: number of tokens in db
+4: last expire atime
+5: oldest token in db atime
+6: db version value
+7: last journal sync
+8: last atime delta
+9: last expire reduction count
+10: newest token in db atime
+
+Only 1,2,6 are used with Redis, others return zero always.
+
+=cut
+
+sub get_storage_variables {
+  my($self) = @_;
+
+  my @tokens = map {"v:$_"}
+               qw{LAST_JOURNAL_SYNC NSPAM NHAM NTOKENS LAST_EXPIRE
+                  OLDEST_TOKEN_AGE DB_VERSION LAST_JOURNAL_SYNC
+                  LAST_ATIME_DELTA LAST_EXPIRE_REDUCE NEWEST_TOKEN_AGE};
+  my @values = $self->_mget(\@tokens);
+  foreach (@values) {
+    $_ = 0 unless $_;
+  }
+
+  return @values;
+}
+
+=head2 get_running_expire_tok
+
+public instance (String $time) get_running_expire_tok ()
+
+Description:
+This method determines if an expire is currently running and returns
+the last time set.
+
+There can be multiple times, so we just pull the greatest (most recent)
+value.
+
+=cut
+
+sub get_running_expire_tok {
+  return 0;
+}
+
+=head2 set_running_expire_tok
+
+public instance (String $time) set_running_expire_tok ()
+
+Description:
+This method sets the time that an expire starts running.
+
+=cut
+
+sub set_running_expire_tok {
+  return 0;
+}
+
+=head2 remove_running_expire_tok
+
+public instance (Boolean) remove_running_expire_tok ()
+
+Description:
+This method removes the row in the database that indicates that
+and expire is currently running.
+
+=cut
+
+sub remove_running_expire_tok {
+  return 1;
+}
+  
+=head2 tok_get
+
+public instance (Integer, Integer, Integer) tok_get (String $token)
+
+Description:
+This method retrieves a specificed token (C<$token>) from the database
+and returns its spam_count, ham_count and last access time.
+
+=cut
+
+sub tok_get {
+  my($self, $token) = @_;
+
+  my $array = $self->tok_get_all($token);
+  return !@$array ? () : (@{$array->[0]})[1,2,3];
+}
+
+=head2 tok_get_all
+
+public instance (\@) tok_get (@ $tokens)
+
+Description:
+This method retrieves the specified tokens (C<$tokens>) from storage and
+returns an array ref of arrays spam count, ham acount and last access time.
+
+=cut
+
+sub tok_get_all {
+  my($self, @keys) = @_;
+
+  my @t = map {"t:$_"} @keys;
+  my @results = $self->_mget(\@t);
+  my @values;
+
+  foreach my $token (@keys) {
+    my $value = shift(@results);
+    push(@values, [$token, _unpack_token($value), 0]) if defined $value;
+  }
+
+  dbg("bayes: tok_get_all found %d tokens out of %d search keys",
+      scalar(@values), scalar(@keys));
+
+  return \@values;
+}
+
+=head2 tok_count_change
+
+public instance (Boolean) tok_count_change (
+  Integer $dspam, Integer $dham, String $token, String $newatime)
+
+Description:
+This method takes a C<$spam_count> and C<$ham_count> and adds it to
+C<$tok> along with updating C<$tok>s atime with C<$atime>.
+
+=cut
+
+sub tok_count_change {
+  my($self, $dspam, $dham, $token, $newatime) = @_;
+
+  $self->multi_tok_count_change($dspam, $dham, {$token => 1}, $newatime);
+}
+
+=head2 multi_tok_count_change
+
+public instance (Boolean) multi_tok_count_change (
+  Integer $dspam, Integer $dham, \% $tokens, String $newatime)
+
+Description:
+This method takes a C<$dspam> and C<$dham> and adds it to all of the
+tokens in the C<$tokens> hash ref along with updating each tokens
+atime with C<$atime>.
+
+=cut
+
+sub multi_tok_count_change {
+  my($self, $dspam, $dham, $tokens, $newatime) = @_;
+
+  # Make sure we have some values
+  $dspam ||= 0;
+  $dham ||= 0;
+
+  my @t = map {"t:$_"} keys %{$tokens};
+  my @v = $self->_mget(\@t);
+
+  foreach my $token (@t) {
+    my $value = shift(@v);
+    my ($spam, $ham) = defined $value ? _unpack_token($value) : (0,0);
+    $spam += $dspam;
+    $ham += $dham;
+    $spam = 0 if $spam < 0;
+    $ham = 0 if $ham < 0;
+    if ($ham == 0 && $spam == 0) {
+      $self->_del_p($token);
+    } else {
+      $self->_set_p($token, _pack_token($spam, $ham), $self->{expire_token});
+    }
+  }
+
+  $self->_wait_all_responses;
+
+  return 1;
+}
+
+=head2 nspam_nham_get
+
+public instance ($spam_count, $ham_count) nspam_nham_get ()
+
+Description:
+This method retrieves the total number of spam and the total number of
+ham learned.
+
+=cut
+
+sub nspam_nham_get {
+  my($self) = @_;
+
+  my @vars = $self->get_storage_variables();
+  ($vars[1], $vars[2]);
+}
+
+=head2 nspam_nham_change
+
+public instance (Boolean) nspam_nham_change (Integer $num_spam,
+                                             Integer $num_ham)
+
+Description:
+This method updates the number of spam and the number of ham in the database.
+
+=cut
+
+sub nspam_nham_change {
+  my($self, $ds, $dh) = @_;
+
+  return 1 unless $ds || $dh;
+
+  my $err = $self->{timer}->run_and_catch(sub {
+    $self->{redis}->incrby("v:NSPAM", $ds) if $ds;
+    $self->{redis}->incrby("v:NHAM", $dh) if $dh;
+  });
+
+  if ($self->{timer}->timed_out()) {
+    die("bayes: Redis connection timed out!");
+  }
+  elsif ($err) {
+    $err =~ s! at /.*!!s; # skip full trace
+    die("bayes: failed to increment nspam $ds nham $dh: $err");
+  }
+
+  return 1;
+}
+
+=head2 tok_touch
+
+public instance (Boolean) tok_touch (String $token,
+                                     String $atime)
+
+Description:
+This method updates the given tokens (C<$token>) atime.
+
+The assumption is that the token already exists in the database.
+
+We will never update to an older atime
+
+=cut
+
+sub tok_touch {
+  my($self, $token, $atime) = @_;
+
+  return $self->tok_touch_all([$token], $atime);
+}
+
+=head2 tok_touch_all
+
+public instance (Boolean) tok_touch (\@ $tokens
+                                     String $atime)
+
+Description:
+This method does a mass update of the given list of tokens C<$tokens>,
+if the existing token atime is < C<$atime>.
+
+=cut
+
+sub tok_touch_all {
+  my($self, $tokens, $newatime) = @_;
+
+  # We just refresh TTL on all
+  foreach (map {"t:$_"} @$tokens) {
+    $self->_expire_p($_, $self->{expire_token});
+  }
+
+  $self->_wait_all_responses;
+
+  return 1;
+}
+
+=head2 cleanup
+
+public instance (Boolean) cleanup ()
+
+Description:
+This method perfoms any cleanup necessary before moving onto the next
+operation.
+
+=cut
+
+sub cleanup {
+  return 1;
+}
+
+=head2 get_magic_re
+
+public instance (String) get_magic_re ()
+
+Description:
+This method returns a regexp which indicates a magic token.
+
+=cut
+
+use constant get_magic_re => undef;
+
+=head2 sync
+
+public instance (Boolean) sync (\% $opts)
+
+Description:
+This method performs a sync of the database
+
+=cut
+
+sub sync {
+  return 1;
+}
+
+=head2 perform_upgrade
+
+public instance (Boolean) perform_upgrade (\% $opts);
+
+Description:
+Performs an upgrade of the database from one version to another, not
+currently used in this implementation.
+
+=cut
+
+sub perform_upgrade {
+  return 1;
+}
+
+=head2 clear_database
+
+public instance (Boolean) clear_database ()
+
+Description:
+This method deletes all records for a particular user.
+
+Callers should be aware that any errors returned by this method
+could causes the database to be inconsistent for the given user.
+
+=cut
+
+sub clear_database {
+  my($self) = @_;
+
+  # TODO
+  warn("bayes: you need to manually clear Redis database\n");
+
+  return 1;
+}
+
+=head2 backup_database
+
+public instance (Boolean) backup_database ()
+
+Description:
+This method will dump the users database in a machine readable format.
+
+=cut
+
+sub backup_database {
+  my($self) = @_;
+
+  return 0 unless $self->tie_db_writable;
+
+  my $atime = time;
+  my @vars = $self->get_storage_variables;
+  print "v\t$vars[6]\tdb_version # this must be the first line!!!\n";
+  print "v\t$vars[1]\tnum_spam\n";
+  print "v\t$vars[2]\tnum_nonspam\n";
+
+  # Process tokens in chunks of 10000 to save some memory on large sets
+  # (sadly it's impossible to prevent Redis-module itself keeping all
+  # resulting keys in memory)
+
+  $self->{redis}->keys('t:*', sub {
+    my ($reply, $error) = @_;
+    die "bayes: token keys fetch failed: $error" if defined $error;
+    for (my $i = 0; $i < @$reply; $i += 10000) {
+      my $end = $i + 10000 > @$reply ? @$reply - 1 : $i + 9999;
+      my @t = @$reply[$i .. $end];
+      my @v = $self->_mget(\@t);
+      die "bayes: token fetch failed" unless @v;
+      for (my $i = 0; $i < @v; $i++) {
+	next unless defined $v[$i];
+        my($ts, $th) = _unpack_token($v[$i]);
+        my $encoded = unpack("H*", substr($t[$i], 2));
+        print "t\t$ts\t$th\t$atime\t$encoded\n";
+      }
+    }
+  });
+  $self->{redis}->wait_all_responses;
+
+  $self->{redis}->keys('s:*', sub {
+    my ($reply, $error) = @_;
+    die "bayes: seen keys fetch failed: $error" if defined $error;
+    for (my $i = 0; $i < @$reply; $i += 10000) {
+      my $end = $i + 10000 > @$reply ? @$reply - 1 : $i + 9999;
+      my @t = @$reply[$i .. $end];
+      my @v = $self->_mget(\@t);
+      die "bayes: seen fetch failed" unless @v;
+      for (my $i = 0; $i < @v; $i++) {
+	next unless defined $v[$i];
+        print "s\t$v[$i]\t".substr($t[$i], 2)."\n";
+      }
+    }
+  });
+  $self->{redis}->wait_all_responses;
+
+  $self->untie_db();
+
+  return 1;
+}
+
+=head2 restore_database
+
+public instance (Boolean) restore_database (String $filename, Boolean $showdots)
+
+Description:
+This method restores a database from the given filename, C<$filename>.
+
+Callers should be aware that any errors returned by this method
+could causes the database to be inconsistent for the given user.
+
+=cut
+
+sub restore_database {
+  my ($self, $filename, $showdots) = @_;
+
+  local *DUMPFILE;
+  if (!open(DUMPFILE, '<', $filename)) {
+    warn("bayes: unable to open backup file $filename: $!");
+    return 0;
+  }
+
+  # This is the critical phase (moving sql around), so don't allow it
+  # to be interrupted.
+  #local $SIG{'INT'} = 'IGNORE';
+  #local $SIG{'HUP'} = 'IGNORE'
+  #  if !Mail::SpamAssassin::Util::am_running_on_windows();
+  #local $SIG{'TERM'} = 'IGNORE';
+
+  unless ($self->clear_database()) {
+    return 0;
+  }
+
+  unless ($self->tie_db_writable()) {
+    return 0;
+  }
+
+  my $token_count = 0;
+  my $db_version;
+  my $num_spam = 0;
+  my $num_ham = 0;
+  my $line_count = 0;
+
+  my $line = <DUMPFILE>;
+  defined $line  or die "Error reading dump file: $!";
+  $line_count++;
+  # We require the database version line to be the first in the file so we can
+  # figure out how to properly deal with the file.  If it is not the first
+  # line then fail
+  if ($line =~ m/^v\s+(\d+)\s+db_version/) {
+    $db_version = $1;
+  } else {
+    warn("bayes: database version must be the first line in the backup file, correct and re-run");
+    return 0;
+  }
+
+  unless ($db_version == 2 || $db_version == 3) {
+    warn("bayes: database version $db_version is unsupported, must be version 2 or 3\n");
+    return 0;
+  }
+
+  my $curtime = time;
+  my $q_cnt = 0;
+
+  for ($!=0; defined($line=<DUMPFILE>); $!=0) {
+    chomp($line);
+    $line_count++;
+
+    if ($showdots && $line_count % 1000 == 0) {
+      print STDERR "." if $showdots;
+    }
+
+    if ($line =~ /^t\s+/) { # token line
+      my @parsed_line = split(/\s+/, $line, 5);
+      my $spam_count = $parsed_line[1] + 0;
+      my $ham_count = $parsed_line[2] + 0;
+      my $token = $parsed_line[4];
+
+      $spam_count = 0 if $spam_count < 0;
+      $ham_count = 0 if $ham_count < 0;
+
+      next if $spam_count == 0 && $ham_count == 0;
+
+      if ($db_version < 3) {
+        # versions < 3 use plain text tokens, so we need to convert to hash
+        $token = substr(sha1($token), -5);
+      } else {
+        # turn unpacked binary token back into binary value
+        $token = pack("H*",$token);
+      }
+
+      $self->_set_p("t:$token", _pack_token($spam_count, $ham_count),
+                    $self->{expire_token});
+      $self->{redis}->wait_all_responses if ++$q_cnt % 10000 == 0;
+      $token_count++;
+    } elsif ($line =~ /^s\s+/) { # seen line
+      my @parsed_line = split(/\s+/, $line, 3);
+      my $flag = $parsed_line[1];
+      my $msgid = $parsed_line[2];
+
+      unless ($flag eq 'h' || $flag eq 's') {
+        dbg("bayes: unknown seen flag ($flag) for line: $line, skipping");
+        next;
+      }
+
+      unless ($msgid) {
+        dbg("bayes: blank msgid for line: $line, skipping");
+        next;
+      }
+
+      $self->_set_p("s:$msgid", $flag, $self->{expire_seen});
+      $self->{redis}->wait_all_responses if ++$q_cnt % 10000 == 0;
+    }
+    elsif ($line =~ /^v\s+/) {     # variable line
+      my @parsed_line = split(/\s+/, $line, 3);
+      my $value = $parsed_line[1] + 0;
+      if ($parsed_line[2] eq 'num_spam') {
+        $num_spam = $value;
+      } elsif ($parsed_line[2] eq 'num_nonspam') {
+        $num_ham = $value;
+      } else {
+        dbg("bayes: restore_database: skipping unknown line: $line");
+      }
+    } else {
+      dbg("bayes: skipping unknown line: $line");
+      next;
+    }
+  }
+
+  defined $line || $!==0  or
+    $!==EBADF ? dbg("bayes: error reading dump file: $!")
+      : die "error reading dump file: $!";
+  close(DUMPFILE) or die "Can't close dump file: $!";
+
+  $self->{redis}->wait_all_responses;
+
+  print STDERR "\n" if $showdots;
+
+  if ($num_spam <= 0 && $num_ham <= 0) {
+    warn("bayes: no num_spam/num_ham found, aborting");
+    return 0;
+  }
+  else {
+    $self->nspam_nham_change($num_spam, $num_ham);
+  }
+
+  dbg("bayes: parsed $line_count lines");
+  dbg("bayes: created database with $token_count tokens based on $num_spam spam messages and $num_ham ham messages");
+
+  $self->untie_db();
+
+  return 1;
+}
+
+=head2 db_readable
+
+public instance (Boolean) db_readable()
+
+Description:
+This method returns a boolean value indicating if the database is in a
+readable state.
+
+=cut
+
+sub db_readable {
+  my($self) = @_;
+
+  return $self->{is_really_open} && $self->{is_officially_open};
+}
+
+=head2 db_writable
+
+public instance (Boolean) db_writable()
+
+Description:
+This method returns a boolean value indicating if the database is in a
+writable state.
+
+=cut
+
+sub db_writable {
+  my($self) = @_;
+
+  return $self->{is_really_open} && $self->{is_officially_open} &&
+           $self->{is_writable};
+}
+
+# token marshalling format for tokens
+# pack CC for values <256, VV for the rest, keep it simple
+
+sub _unpack_token {
+  my $value = shift;
+
+  my ($ts, $th);
+
+  if (length($value) == 2) {
+    ($ts, $th) = unpack("CC", $value);
+  }
+  elsif (length($value) == 8) {
+    ($ts, $th) = unpack("VV", $value);
+  }
+  else {
+    dbg("bayes: unknown token format: ".unpack("H*", $value));
+  }
+
+  return ($ts||0, $th||0);
+}
+
+sub _pack_token {
+  my($ts, $th) = @_;
+
+  if ($ts < 256 && $th < 256) {
+    return pack("CC", $ts, $th);
+  } else {
+    return pack("VV", $ts, $th);
+  }
+}
+
+#
+# Redis functions
+#
+
+sub _get {
+  my ($self, $key) = @_;
+
+  my $value;
+
+  my $err = $self->{timer}->run_and_catch(sub {
+    $value = $self->{redis}->get($key);
+  });
+
+  if ($self->{timer}->timed_out()) {
+    die("bayes: get timed out!");
+  }
+  elsif ($err) {
+    $err =~ s! at /.*!!s; # skip full trace
+    die("bayes: get failed: $err");
+  }
+
+  return $value;
+}
+
+sub _mget {
+  my ($self, $keys) = @_;
+
+  my @values;
+
+  my $err = $self->{timer}->run_and_catch(sub {
+    @values = $self->{redis}->mget(@$keys);
+  });
+
+  if ($self->{timer}->timed_out()) {
+    die("bayes: mget timed out!");
+  }
+  elsif ($err) {
+    $err =~ s! at /.*!!s; # skip full trace
+    die("bayes: mget failed: $err");
+  }
+
+  return @values;
+}
+
+sub _set {
+  my ($self, $key, $value, $expire) = @_;
+
+  my $err = $self->{timer}->run_and_catch(sub {
+    if (defined $expire) {
+      $self->{redis}->setex($key, $expire, $value);
+    } else {
+      $self->{redis}->set($key, $value);
+    }
+  });
+
+  if ($self->{timer}->timed_out()) {
+    die("bayes: set timed out!");
+  }
+  elsif ($err) {
+    $err =~ s! at /.*!!s; # skip full trace
+    die("bayes: set failed: $err");
+  }
+
+  return 1;
+}
+
+# Pipelined set, must call _wait_all_responses after
+sub _set_p {
+  my ($self, $key, $value, $expire) = @_;
+
+  if (defined $expire) {
+    $self->{redis}->setex($key, $expire, $value, sub {});
+  } else {
+    $self->{redis}->set($key, $value, sub {});
+  }
+
+  return 1;
+}
+
+# Pipelined del, must call _wait_all_responses after
+sub _del_p {
+  my ($self, $key) = @_;
+
+  $self->{redis}->del($key, sub {});
+
+  return 1;
+}
+
+# Pipelined expire, must call _wait_all_responses after
+sub _expire_p {
+  my ($self, $key, $expire) = @_;
+
+  $self->{redis}->expire($key, $expire, sub {});
+
+  return 1;
+}
+
+sub _wait_all_responses {
+  my ($self) = @_;
+
+  my $err = $self->{timer}->run_and_catch(sub {
+    $self->{redis}->wait_all_responses;
+  });
+
+  if ($self->{timer}->timed_out()) {
+    die("bayes: wait_all_responses timed out!");
+  }
+  elsif ($err) {
+    $err =~ s! at /.*!!s; # skip full trace
+    die("bayes: wait_all_responses failed: $err");
+  }
+
+  return 1;
+}
+
+sub _mset {
+  my ($self, $values) = @_;
+
+  my $err = $self->{timer}->run_and_catch(sub {
+    $self->{redis}->mset(@$values);
+  });
+
+  if ($self->{timer}->timed_out()) {
+    die("bayes: mset timed out!");
+  }
+  elsif ($err) {
+    $err =~ s! at /.*!!s; # skip full trace
+    die("bayes: mset failed: $err");
+  }
+
+  return 1;
+}
+
+sub _del {
+  my ($self, $key) = @_;
+
+  my $err = $self->{timer}->run_and_catch(sub {
+    $self->{redis}->del($key);
+  });
+
+  if ($self->{timer}->timed_out()) {
+    die("bayes: del timed out!");
+  }
+  elsif ($err) {
+    $err =~ s! at /.*!!s; # skip full trace
+    die("bayes: mset failed: $err");
+  }
+
+  return 1;
+}
+
+sub sa_die { Mail::SpamAssassin::sa_die(@_); }
+
+1;