You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spamassassin.apache.org by mm...@apache.org on 2013/09/05 20:32:33 UTC

svn commit: r1520380 - /spamassassin/trunk/lib/Mail/SpamAssassin/BayesStore/Redis.pm

Author: mmartinec
Date: Thu Sep  5 18:32:32 2013
New Revision: 1520380

URL: http://svn.apache.org/r1520380
Log:
Bug 6972: BayesStore/Redis.pm: ditching CPAN module Redis gains speed, avoids its bugs, works on IPv6

Modified:
    spamassassin/trunk/lib/Mail/SpamAssassin/BayesStore/Redis.pm

Modified: spamassassin/trunk/lib/Mail/SpamAssassin/BayesStore/Redis.pm
URL: http://svn.apache.org/viewvc/spamassassin/trunk/lib/Mail/SpamAssassin/BayesStore/Redis.pm?rev=1520380&r1=1520379&r2=1520380&view=diff
==============================================================================
--- spamassassin/trunk/lib/Mail/SpamAssassin/BayesStore/Redis.pm (original)
+++ spamassassin/trunk/lib/Mail/SpamAssassin/BayesStore/Redis.pm Thu Sep  5 18:32:32 2013
@@ -25,20 +25,38 @@ Mail::SpamAssassin::BayesStore::Redis - 
 
 This module implementes a Redis based bayesian storage module.
 
-A redis server with a Lua support (2.6 or higher) is strongly recommended
+A redis server with a Lua support (2.6 or higher) is recommended
 for performance reasons.
 
 The bayes_sql_dsn config variable has been hijacked for our purposes:
 
   bayes_sql_dsn
 
-    Optional config parameters sent as is to Redis->new().
-    Example: server=localhost:6379;password=foo;reconnect=20
+    Optional config parameters affecting a connection to a redis server.
 
-    By default encoding=undef is set as suggested by Redis module.
+    This is a semicolon-separated list of option=value pairs, where an option
+    can be: server, password, database. Unrecognized options are silently
+    ignored.
+
+    The 'server' option specifies a socket on which a redis server is
+    listening. It can be an absolute path of a Unix socket, or a host:port
+    pair, where a host can be an IPv4 or IPv6 address or a host name.
+    An IPv6 address must be enclosed in brackets, e.g. [::1]:6379
+    (IPv6 support in a redis server is available since version 2.8.0).
+    A default is to connect to an INET socket at 127.0.0.1, port 6379.
+
+    The value of a 'password' option is sent in an AUTH command to a redis
+    server on connecting if a server requests authentication. A password is
+    sent in plain text and a redis server only offers an optional rudimentary
+    authentication. To limit access to a redis server use its 'bind' option
+    to bind to a specific interface (typically to a loopback interface),
+    or use a host-based firewall.
+
+    The value of a 'database' option can be an non-negative (small) integer,
+    which is passed to a redis server with a SELECT command on connecting,
+    and chooses a sub-database index. A default database index is 0.
 
-    To use non-default database id, use "database=x". This is not passed
-    to new(), but specially handled to call Redis->select($id).
+    Example: server=localhost:6379;password=foo;database=2
 
   bayes_token_ttl
 
@@ -54,17 +72,221 @@ The bayes_sql_dsn config variable has be
 
 Expiry is done internally in Redis using *_ttl settings mentioned above,
 but only if bayes_auto_expire is true (which is a default).  This is
-why --force-expire etc does nothing and token counts and atime values
-are shown zero in statistics.
+why --force-expire etc does nothing, and token counts and atime values
+are shown as zero in statistics.
 
 LIMITATIONS: Only global bayes storage is implemented, per-user bayes is
-not available. Dumping (sa-learn --backup) of a very large database may
-not be possible due to memory limitations and inefficient full database
-traversal mechanism. This backend storage module is new with SpamAssassin
-3.4.0 and may be revised in future versions as more experience is gained.
+not currently available. Dumping (sa-learn --backup, or --dump) of a huge
+database may not be possible if all keys do not fit into process memory.
 
 =cut
 
+package Mail::SpamAssassin::BayesStore::TinyRedis;
+# Implements the new unified request protocol, introduced in Redis 1.2 .
+
+use strict;
+use re 'taint';
+use warnings;
+
+use Errno qw(EINTR EAGAIN EPIPE ENOTCONN ECONNRESET ECONNABORTED);
+use IO::Socket::UNIX;
+
+our $io_socket_module_name;
+BEGIN {
+  if (eval { require IO::Socket::IP }) {
+    $io_socket_module_name = 'IO::Socket::IP';
+  } elsif (eval { require IO::Socket::INET6 }) {
+    $io_socket_module_name = 'IO::Socket::INET6';
+  } elsif (eval { require IO::Socket::INET }) {
+    $io_socket_module_name = 'IO::Socket::INET';
+  }
+}
+
+sub new {
+  my($class, %args) = @_;
+  my $self = bless { args => {%args} }, $class;
+  my $outbuf = ''; $self->{outbuf} = \$outbuf;
+  $self->{batch_size} = 0;
+  $self->{server} = $args{server} || $args{sock} || '127.0.0.1:6379';
+  $self->{on_connect} = $args{on_connect};
+  return if !$self->connect;
+  $self;
+}
+
+sub DESTROY {
+  my $self = $_[0];
+  local($@, $!, $_);
+  undef $self->{sock};
+}
+
+sub disconnect {
+  my $self = $_[0];
+  local($@, $!);
+  undef $self->{sock};
+}
+
+sub connect {
+  my $self = $_[0];
+
+  $self->disconnect;
+  my $sock;
+  my $server = $self->{server};
+  if ($server =~ m{^/}) {
+    $sock = IO::Socket::UNIX->new(
+              Peer => $server, Type => SOCK_STREAM);
+  } else {
+    $sock = $io_socket_module_name->new(
+              PeerAddr => $server, Proto => 'tcp');
+  }
+  if ($sock) {
+    $self->{sock} = $sock;
+
+    $self->{sock_fd} = $sock->fileno; $self->{fd_mask} = '';
+    vec($self->{fd_mask}, $self->{sock_fd}, 1) = 1;
+
+    # an on_connect() callback must not use batched calls!
+    $self->{on_connect}->($self)  if $self->{on_connect};
+  }
+  $sock;
+}
+
+# Receive, parse and return $cnt consecutive redis replies as a list.
+#
+sub _response {
+  my($self, $cnt) = @_;
+
+  my $sock = $self->{sock};
+  if (!$sock) {
+    $self->connect  or die "Connect failed: $!";
+    $sock = $self->{sock};
+  };
+
+  my @list;
+
+  for (1 .. $cnt) {
+
+    my $result = <$sock>;
+    if (!defined $result) {
+      $self->disconnect;
+      die "Error reading from Redis server: $!";
+    }
+    chomp $result;
+    my $resp_type = substr($result, 0, 1, '');
+
+    if ($resp_type eq '$') {  # bulk reply
+      if ($result < 0) {
+        push(@list, undef);  # null bulk reply
+      } else {
+        my $data = ''; my $ofs = 0; my $len = $result + 2;
+        while ($len > 0) {
+          my $nbytes = read($sock, $data, $len, $ofs);
+          if (!$nbytes) {
+            $self->disconnect;
+            defined $nbytes  or die "Error reading from Redis server: $!";
+            die "Redis server closed connection";
+          }
+          $ofs += $nbytes; $len -= $nbytes;
+        }
+        chomp $data;
+        push(@list, $data);
+      }
+
+    } elsif ($resp_type eq ':') {  # integer reply
+      push(@list, 0+$result);
+
+    } elsif ($resp_type eq '+') {  # status reply
+      push(@list, $result);
+
+    } elsif ($resp_type eq '*') {  # multi-bulk reply
+      push(@list, $result < 0 ? undef : $self->_response(0+$result) );
+
+    } elsif ($resp_type eq '-') {  # error reply
+      die "$result\n";
+
+    } else {
+      die "Unknown Redis reply: $resp_type ($result)";
+    }
+  }
+  \@list;
+}
+
+sub _write_buff {
+  my($self, $bufref) = @_;
+
+  if (!$self->{sock}) { $self->connect or die "Connect failed: $!" };
+  my $nwrite;
+  for (my $ofs = 0; $ofs < length($$bufref); $ofs += $nwrite) {
+    # to reliably detect a disconnect we need to check for an input event
+    # using a select; checking status of syswrite is not sufficient
+    my($rout, $wout, $inbuff); my $fd_mask = $self->{fd_mask};
+    my $nfound = select($rout=$fd_mask, $wout=$fd_mask, undef, undef);
+    defined $nfound && $nfound >= 0 or die "Select failed: $!";
+    if (vec($rout, $self->{sock_fd}, 1) &&
+        !sysread($self->{sock}, $inbuff, 1024)) {
+      # eof, try reconnecting
+      $self->connect  or die "Connect failed: $!";
+    }
+    local $SIG{PIPE} = 'IGNORE';  # don't signal on a write to a widowed pipe
+    $nwrite = syswrite($self->{sock}, $$bufref, length($$bufref)-$ofs, $ofs);
+    next if defined $nwrite;
+    $nwrite = 0;
+    if ($! == EINTR || $! == EAGAIN) {  # no big deal, try again
+      Time::HiRes::sleep(0.1);  # slow down, just in case
+    } else {
+      $self->disconnect;
+      if ($! == ENOTCONN   || $! == EPIPE ||
+          $! == ECONNRESET || $! == ECONNABORTED) {
+        $self->connect  or die "Connect failed: $!";
+      } else {
+        die "Error writing to redis socket: $!";
+      }
+    }
+  }
+  1;
+}
+
+# Send a redis command with arguments, returning a redis reply.
+#
+sub call {
+  my $self = shift;
+
+  my $buff = '*' . scalar(@_) . "\015\012";
+  $buff .= '$' . length($_) . "\015\012" . $_ . "\015\012"  for @_;
+
+  $self->_write_buff(\$buff);
+  local($/) = "\015\012";
+  my $arr_ref = $self->_response(1);
+  $arr_ref && $arr_ref->[0];
+}
+
+# Append a redis command with arguments to a batch.
+#
+sub b_call {
+  my $self = shift;
+
+  my $bufref = $self->{outbuf};
+  $$bufref .= '*' . scalar(@_) . "\015\012";
+  $$bufref .= '$' . length($_) . "\015\012" . $_ . "\015\012"  for @_;
+  ++ $self->{batch_size};
+}
+
+# Send a batch of commands, returning an arrayref of redis replies,
+# each array element corresponding to one command in a batch.
+#
+sub b_results {
+  my $self = $_[0];
+  my $batch_size = $self->{batch_size};
+  return if !$batch_size;
+  my $bufref = $self->{outbuf};
+  $self->_write_buff($bufref);
+  $$bufref = ''; $self->{batch_size} = 0;
+  local($/) = "\015\012";
+  $self->_response($batch_size);
+}
+
+1;
+
+
 package Mail::SpamAssassin::BayesStore::Redis;
 
 use strict;
@@ -90,10 +312,6 @@ BEGIN {
   @ISA = qw( Mail::SpamAssassin::BayesStore );
 }
 
-# Support for "SCRIPT LOAD" command is needed, provided by Redis version 1.954
-# Method on_connect() is available since Redis version 1.956 .
-use constant HAS_REDIS => eval { require Redis; Redis->VERSION(1.956) };
-
 =head1 METHODS
 
 =head2 new
@@ -112,22 +330,18 @@ sub new {
   $class = ref($class) || $class;
   my $self = $class->SUPER::new(@_);
 
-  unless (HAS_REDIS) {
-    dbg("bayes: unable to connect to database: Redis module not available");
-  }
-
   my $bconf = $self->{bayes}->{conf};
 
   foreach (split(';', $bconf->{bayes_sql_dsn})) {
     my ($a, $b) = split('=');
-    unless (defined $b) {
+    if (!defined $b) {
       warn("bayes: invalid bayes_sql_dsn config\n");
       return;
-    }
-    if ($a eq 'database') {
+    } elsif ($a eq 'database') {
       $self->{db_id} = $b;
-    }
-    else {
+    } elsif ($a eq 'password') {
+      $self->{password} = $b;
+    } else {
       push @{$self->{redis_conf}}, $a => $b eq 'undef' ?
         undef : untaint_var($b);
     }
@@ -161,7 +375,7 @@ sub new {
 sub disconnect {
   my($self) = @_;
   if ($self->{connected}) {
-    local($@, $!, $_);
+    local($@, $!);
     dbg("bayes: Redis disconnect");
     $self->{connected} = 0; undef $self->{redis};
   }
@@ -173,24 +387,25 @@ sub DESTROY {
   $self->{connected} = 0; undef $self->{redis};
 }
 
-# called from a Redis module on Redis->new and on automatic re-connect
+# Called from a Redis module on Redis->new and on automatic re-connect.
+# The on_connect() callback must not use batched calls!
 sub on_connect {
   my($self, $r) = @_;
-
-  dbg("bayes: Redis on-connect");
-
-  if ($self->{db_id}) {  # defined and nonzero
-    # work around a Redis module bug:
-    #   Select new database doesn't survive after reconnect.
-    #   https://github.com/melo/perl-redis/issues/38
-    eval {
-      $r->select($self->{db_id}); 1;
-    } or do {
-      $@ =~ s{\s+at /.*}{}s;
-      $self->disconnect;
-      die "Redis error during database select(): $@\n";
-    };
-  }
+  my $db_id = $self->{db_id} || 0;
+  dbg("bayes: Redis on-connect, db_id %d", $db_id);
+  eval {
+    $r->call('SELECT', $db_id) eq 'OK' ? 1 : 0;
+  } or do {
+    if ($@ =~ /\bNOAUTH\b/) {
+      defined $self->{password}
+        or die "Redis server requires authentication, no password provided";
+      $r->call('AUTH', $self->{password});
+      $r->call('SELECT', $db_id);
+    } else {
+      chomp $@; die "Redis error: $@";
+    }
+  };
+  $r->call('CLIENT', 'SETNAME', 'sa['.$$.']');
   1;
 }
 
@@ -201,24 +416,20 @@ sub connect {
   undef $self->{redis};  # just in case
 
   my $err = $self->{timer}->run_and_catch(sub {
-    my $mypid = $self->{opened_from_pid} = $$;
+    $self->{opened_from_pid} = $$;
     # will keep a persistent session open to a redis server
-    $self->{redis} = Redis->new(
-                       name => 'sa[' . $mypid . ']',
+    $self->{redis} = Mail::SpamAssassin::BayesStore::TinyRedis->new(
                        @{$self->{redis_conf}},
-                       encoding => undef,
-                       on_connect => sub { my($r) = @_; $self->on_connect($r) },
+                       on_connect => sub { $self->on_connect(@_) },
                      );
+    $self->{redis} or die "Error: $!";
   });
   if ($self->{timer}->timed_out()) {
-    warn("bayes: Redis connection timed out!");
     undef $self->{redis};
-    return;
+    die "bayes: Redis connection timed out!";
   } elsif ($err) {
-    $err =~ s{ at /.*}{}s; # skip full trace
-    warn("bayes: Redis connection failed: $err");
     undef $self->{redis};
-    return;
+    die "bayes: Redis failed: $err";
   }
   $self->{connected} = 1;
 }
@@ -236,8 +447,6 @@ forking off child processes.
 sub prefork_init {
   my ($self) = @_;
 
-  HAS_REDIS or return;
-
   # Each child process must establish its own connection with a Redis server,
   # re-using a common forked socket leads to serious trouble (garbled data).
   #
@@ -265,21 +474,19 @@ This optional method is called in a chil
 sub spamd_child_init {
   my ($self) = @_;
 
-  HAS_REDIS or return;
-
   # Each child process must establish its own connection with a Redis server,
   # re-using a common forked socket leads to serious trouble (garbled data).
   #
   # Just in case the parent master process did not call prefork_init() above,
   # we try to silently renounce the use of existing cloned connection here.
-  # As the prefork_init plugin callback has only been in introduced in
+  # As the prefork_init plugin callback has only been introduced in
   # SpamAssassin 3.4.0, this situation can arrise in case of some third party
   # software (or a pre-3.4.0 version of spamd) is somehow using this plugin.
   # Better safe than sorry...
 
   if ($self->{connected}) {
     dbg("bayes: spamd_child_init, closing a parent's session ".
-        "with a Redis server in a child process");
+        "to a Redis server in a child process");
     $self->untie_db;
     $self->disconnect;  # just drop it, don't shut down parent's session
   }
@@ -297,8 +504,6 @@ This method ensures that the database co
 sub tie_db_readonly {
   my($self) = @_;
 
-  HAS_REDIS or return;
-
   $self->{is_writable} = 0;
   my $success;
   if ($self->{connected}) {
@@ -324,8 +529,6 @@ begin using the database immediately.
 sub tie_db_writable {
   my($self) = @_;
 
-  HAS_REDIS or return;
-
   $self->{is_writable} = 0;
   my $success;
   if ($self->{connected}) {
@@ -353,9 +556,8 @@ the database immediately.
 sub _open_db {
   my($self) = @_;
 
-  dbg("bayes: _open_db(%s); Redis %s",
-      $self->{connected} ? 'already connected' : 'not yet connected',
-      Redis->VERSION);
+  dbg("bayes: _open_db(%s)",
+      $self->{connected} ? 'already connected' : 'not yet connected');
 
   if ($self->{connected}) {
     $self->{is_officially_open} = 1;
@@ -363,31 +565,28 @@ sub _open_db {
   }
 
   $self->read_db_configs();
-
   $self->connect;
 
-  my $have_lua = $self->{have_lua};
-  if (!$self->{redis_server_version}) {
-    my $info = $self->{info} = $self->{redis}->info;
-    if ($info) {
-      $self->{redis_server_version} = $info->{redis_version};
-      $have_lua = $self->{have_lua} = 1  if exists $info->{used_memory_lua};
-
-      dbg("bayes: redis server version %s, memory used %.1f MiB%s",
-          $info->{redis_version}, $info->{used_memory}/1024/1024,
-          !$have_lua ? '' : ", Lua is available");
-    }
-    if (!$have_lua) {
-      warn "bayes: Redis server does not support Lua, ".
-           "upgrade or expect slower operation\n";
+  if (!defined $self->{redis_server_version}) {
+    my $info = $self->{info} = $self->{redis}->call("INFO");
+    if (defined $info) {
+      my $redis_mem; local $1;
+      $self->{redis_server_version} =
+                          $info =~ /^redis_version:\s*(.*?)\r?$/m ? $1 : '';
+      $self->{have_lua} = $info =~ /^used_memory_lua:/m ? 1 : 0;
+      $redis_mem = $1  if $info =~ /^used_memory:\s*(.*?)\r?$/m;
+      dbg("bayes: redis server version %s, memory used %.1f MiB, Lua %s",
+          $self->{redis_server_version}, $redis_mem/1024/1024,
+          $self->{have_lua} ? 'is available' : 'is not available');
     }
   }
 
-  $self->{db_version} = $self->_get('v:DB_VERSION');
+  $self->{db_version} = $self->{redis}->call('GET', 'v:DB_VERSION');
 
   if (!$self->{db_version}) {
     $self->{db_version} = $self->DB_VERSION;
-    my $ret = $self->{redis}->mset('v:DB_VERSION', $self->{db_version},
+    my $ret = $self->{redis}->call('MSET',
+                                   'v:DB_VERSION', $self->{db_version},
                                    'v:NSPAM', 0,
                                    'v:NHAM', 0,
                                    'v:TOKEN_FORMAT', 2 );
@@ -403,7 +602,7 @@ sub _open_db {
       warn("bayes: bayes db version $self->{db_version} not supported, aborting\n");
       return 0;
     }
-    my $token_format = $self->_get('v:TOKEN_FORMAT') || 0;
+    my $token_format = $self->{redis}->call('GET', 'v:TOKEN_FORMAT') || 0;
     if ($token_format < 2) {
       warn("bayes: bayes old token format $token_format not supported, ".
            "consider backup/restore or initialize a database\n");
@@ -411,7 +610,7 @@ sub _open_db {
     }
   }
 
-  if ($have_lua && !defined $self->{multi_hmget_script}) {
+  if ($self->{have_lua} && !defined $self->{multi_hmget_script}) {
     $self->_define_lua_scripts;
   }
 
@@ -480,7 +679,7 @@ is not found.
 sub seen_get {
   my($self, $msgid) = @_;
 
-  return $self->_get("s:$msgid");
+  return $self->{redis}->call('GET', "s:$msgid");
 }
 
 =head2 seen_put
@@ -496,7 +695,13 @@ of two values 's' for spam and 'h' for h
 sub seen_put {
   my($self, $msgid, $flag) = @_;
 
-  $self->_set("s:$msgid", $flag, $self->{expire_seen});
+  my $r = $self->{redis};
+  if ($self->{expire_seen}) {
+    $r->call('SETEX', "s:$msgid", $self->{expire_seen}, $flag);
+  } else {
+    $r->call('SET',   "s:$msgid", $flag);
+  }
+
   return 1;
 }
 
@@ -512,7 +717,7 @@ This method removes C<$msgid> from the d
 sub seen_delete {
   my($self, $msgid) = @_;
 
-  $self->_del("s:$msgid");
+  $self->{redis}->call('DEL', "s:$msgid");
   return 1;
 }
 
@@ -549,8 +754,7 @@ sub get_storage_variables {
                  OLDEST_TOKEN_AGE DB_VERSION LAST_JOURNAL_SYNC
                  LAST_ATIME_DELTA LAST_EXPIRE_REDUCE NEWEST_TOKEN_AGE
                  TOKEN_FORMAT}  if !@varnames;
-  @varnames = map("v:$_", @varnames);
-  my $values = $self->_mget(\@varnames);
+  my $values = $self->{redis}->call('MGET', map('v:'.$_, @varnames));
   return if !$values;
   return map(defined $_ ? $_ : 0, @$values);
 }
@@ -563,9 +767,6 @@ Description:
 This method determines if an expire is currently running and returns
 the last time set.
 
-There can be multiple times, so we just pull the greatest (most recent)
-value.
-
 =cut
 
 sub get_running_expire_tok {
@@ -637,56 +838,53 @@ sub tok_get_all {
 
   if (! $self->{have_lua} ) {
 
-    foreach my $token (@_) {
-      $r->hmget('w:'.$token, 's', 'h', sub {
-        my($values, $error) = @_;
-        return if !$values || @$values != 2;
-        return if !$values->[0] && !$values->[1];
-        push(@values, [$token, $values->[0]||0, $values->[1]||0, 0]);
-        1;
-      });
+    $r->b_call('HMGET', 'w:'.$_, 's', 'h')  for @_;
+    my $results = $r->b_results;
+
+    if (@$results != @_) {
+      $self->disconnect;
+      die sprintf("bayes: tok_get_all got %d entries, expected %d\n",
+                  scalar @$results, scalar @_);
+    }
+    for my $j (0 .. $#$results) {
+      my($s,$h) = @{$results->[$j]};
+      push(@values, [$_[$j], ($s||0)+0, ($h||0)+0, 0])  if $s || $h;
     }
-    $self->_wait_all_responses;
 
-  } else {  # have Lua, faster
+  } else {  # have Lua
 
     # no need for cryptographical strength, just checking for protocol errors
     my $nonce = sprintf("%06x", rand(0xffffff));
 
-    my @results;
+    my $result;
     eval {
-      @results = $r->evalsha($self->{multi_hmget_script},
-                             scalar @_, @_, $nonce);
+      $result = $r->call('EVALSHA', $self->{multi_hmget_script},
+                         scalar @_, map('w:'.$_, @_), $nonce);
       1;
     } or do {  # Lua script probably not cached, define again and re-try
-      if ($@ !~ /^\Q[evalsha] NOSCRIPT\E/) {
+      if ($@ !~ /^NOSCRIPT/) {
         $self->disconnect;
         die "bayes: Redis LUA error: $@\n";
       }
       $self->_define_lua_scripts;
-      @results = $r->evalsha($self->{multi_hmget_script},
-                             scalar @_, @_, $nonce);
+      $result = $r->call('EVALSHA', $self->{multi_hmget_script},
+                         scalar @_, map('w:'.$_, @_), $nonce);
     };
-    my $r_nonce = $results[1];
-    if (@results != 2) {
-      $self->disconnect;
-      die sprintf("bayes: tok_get_all expected 2 results, got %d\n",
-                  scalar @results);
-    } elsif ($r_nonce ne $nonce) {
-      # check for redis protocol falling out of step
+    my @items = split(' ', $result);
+    my $r_nonce = pop(@items);
+    if ($r_nonce ne $nonce) {
+      # redis protocol error?
       $self->disconnect;
       die sprintf("bayes: tok_get_all nonce mismatch, expected %s, got %s\n",
                   $nonce, defined $r_nonce ? $r_nonce : 'UNDEF');
+    } elsif (@items != @_) {
+      $self->disconnect;
+      die sprintf("bayes: tok_get_all got %d entries, expected %d\n",
+                  scalar @items, scalar @_);
     } else {
-      @results = split(' ', $results[0]);
-      if (@results != @_) {
-        $self->disconnect;
-        die sprintf("bayes: tok_get_all got %d entries, expected %d\n",
-                    scalar @results, scalar @_);
-      }
-      foreach my $token (@_) {
-        my($s,$h) = split(m{/}, shift @results, 2);
-        push(@values, [$token, ($s||0)+0, ($h||0)+0, 0])  if $s || $h;
+      for my $j (0 .. $#items) {
+        my($s,$h) = split(m{/}, $items[$j], 2);
+        push(@values, [$_[$j], ($s||0)+0, ($h||0)+0, 0])  if $s || $h;
       }
     }
   }
@@ -734,69 +932,43 @@ sub multi_tok_count_change {
   $dham  ||= 0;
   # the increment must be an integer, otherwise redis returns an error
 
-  my $ttl = $self->{expire_token};  # time-to-live, in seconds
-
   dbg("bayes: multi_tok_count_change learning %d spam, %d ham",
       $dspam, $dham);
 
-  $self->connect if !$self->{connected};
-
-  if ($self->{have_lua}) {
-
-    my $r = $self->{redis};
-    my @tokens_list = keys %$tokens;
-    my $ntokens = scalar @tokens_list;
-    my $cnt;
-    eval {
-      $cnt = $r->evalsha($self->{multi_hincrby},
-                         $ntokens, @tokens_list, $dspam, $dham, $ttl);
-      1;
-    } or do {  # Lua script probably not cached, define again and re-try
-      if ($@ !~ /^\Q[evalsha] NOSCRIPT\E/) {
-        $self->disconnect;
-        die "bayes: Redis LUA error: $@\n";
-      }
-      $self->_define_lua_scripts;
-      $cnt = $r->evalsha($self->{multi_hincrby},
-                         $ntokens, @tokens_list, $dspam, $dham, $ttl);
-    };
-    if ($cnt != $ntokens) {
-      $self->disconnect;
-      die sprintf("bayes: multi_tok_count_change got %d, expected %d\n",
-                  $cnt, $ntokens);
-    }
+  my $ttl = $self->{expire_token};  # time-to-live, in seconds
 
-  } else {  # no Lua, slower
+  $self->connect if !$self->{connected};
+  my $r = $self->{redis};
 
-    if ($dspam > 0 || $dham > 0) {  # learning
-      while (my($token,$v) = each(%$tokens)) {
-        $self->_hincrby_p('w:'.$token, 's', $dspam)  if $dspam > 0;
-        $self->_hincrby_p('w:'.$token, 'h', $dham)   if $dham  > 0;
-        $self->_expire_p('w:'.$token, $ttl)  if $ttl;
-      }
-      $self->_wait_all_responses;
+  if ($dspam > 0 || $dham > 0) {  # learning
+    while (my($token,$v) = each(%$tokens)) {
+      my $key = 'w:'.$token;
+      $r->b_call('HINCRBY', $key, 's', int $dspam) if $dspam > 0;
+      $r->b_call('HINCRBY', $key, 'h', int $dham)  if $dham  > 0;
+      $r->b_call('EXPIRE',  $key, $ttl)  if $ttl;
     }
+    $r->b_results;  # collect response, ignoring results
+  }
 
-    if ($dspam < 0 || $dham < 0) {  # unlearning - rare, not as efficient
-      while (my($token,$v) = each(%$tokens)) {
-        if ($dspam < 0) {
-          my $result = $self->_hincrby('w:'.$token, 's', int $dspam);
-          if (!$result || $result <= 0) {
-            $self->_hdel_p('w:'.$token, 's');
-          } elsif ($ttl) {
-            $self->_expire_p('w:'.$token, $ttl);
-          }
+  if ($dspam < 0 || $dham < 0) {  # unlearning - rare, not as efficient
+    while (my($token,$v) = each(%$tokens)) {
+      my $key = 'w:'.$token;
+      if ($dspam < 0) {
+        my $result = $r->call('HINCRBY', $key, 's', int $dspam);
+        if (!$result || $result <= 0) {
+          $r->call('HDEL',   $key, 's');
+        } elsif ($ttl) {
+          $r->call('EXPIRE', $key, $ttl);
         }
-        if ($dham < 0) {
-          my $result = $self->_hincrby('w:'.$token, 'h', int $dham);
-          if (!$result || $result <= 0) {
-            $self->_hdel_p('w:'.$token, 'h');
-          } elsif ($ttl) {
-            $self->_expire_p('w:'.$token, $ttl);
-          }
+      }
+      if ($dham < 0) {
+        my $result = $r->call('HINCRBY', $key, 'h', int $dham);
+        if (!$result || $result <= 0) {
+          $r->call('HDEL',   $key, 'h');
+        } elsif ($ttl) {
+          $r->call('EXPIRE', $key, $ttl);
         }
       }
-      $self->_wait_all_responses;
     }
   }
 
@@ -837,10 +1009,12 @@ sub nspam_nham_change {
   return 1 unless $ds || $dh;
 
   $self->connect if !$self->{connected};
+  my $r = $self->{redis};
 
   my $err = $self->{timer}->run_and_catch(sub {
-    $self->{redis}->incrby("v:NSPAM", $ds) if $ds;
-    $self->{redis}->incrby("v:NHAM", $dh) if $dh;
+    $r->b_call('INCRBY', "v:NSPAM", $ds) if $ds;
+    $r->b_call('INCRBY', "v:NHAM",  $dh) if $dh;
+    $r->b_results;  # collect response, ignoring results
   });
 
   if ($self->{timer}->timed_out()) {
@@ -848,7 +1022,6 @@ sub nspam_nham_change {
     die("bayes: Redis connection timed out!");
   }
   elsif ($err) {
-    $err =~ s{ at /.*}{}s; # skip full trace
     $self->disconnect;
     die("bayes: failed to increment nspam $ds nham $dh: $err");
   }
@@ -891,42 +1064,25 @@ sub tok_touch_all {
   my($self, $tokens, $newatime) = @_;
 
   my $ttl = $self->{expire_token};  # time-to-live, in seconds
-  return 1 unless defined $ttl;
+  return 1  unless $ttl && $tokens && @$tokens;
 
   dbg("bayes: tok_touch_all setting expire to %s on %d tokens",
       $ttl, scalar @$tokens);
 
   $self->connect if !$self->{connected};
+  my $r = $self->{redis};
 
-  # We just refresh TTL on all
-  if (! $self->{have_lua} ) {
+  # Benchmarks for a 'with-Lua' vs. a 'batched non-Lua' case show same speed,
+  # so for simplicity we only kept a batched non-Lua code. Note that this
+  # only applies to our own implementation of the Redis client protocol
+  # which offers efficient command batching (pipelining) - with the Redis
+  # CPAN module the batched case would be worse by about 33% on the average.
 
-    $self->_expire_p("w:$_", $ttl) for @$tokens;
-    $self->_wait_all_responses;
+  # We just refresh TTL on all
 
-  } else {  # have Lua, faster
+  $r->b_call('EXPIRE', 'w:'.$_, $ttl) for @$tokens;
+  $r->b_results;  # collect response, ignoring results
 
-    my $r = $self->{redis};
-    my $cnt;
-    eval {
-      $cnt = $r->evalsha($self->{multi_expire_script},
-                         scalar @$tokens, @$tokens, $ttl);
-      1;
-    } or do {  # Lua script probably not cached, define again and re-try
-      if ($@ !~ /^\Q[evalsha] NOSCRIPT\E/) {
-        $self->disconnect;
-        die "bayes: Redis LUA error: $@\n";
-      }
-      $self->_define_lua_scripts;
-      $cnt = $r->evalsha($self->{multi_expire_script},
-                         scalar @$tokens, @$tokens, $ttl);
-    };
-    if ($cnt != @$tokens) {
-      $self->disconnect;
-      die sprintf("bayes: tok_touch_all got %d, expected %d\n",
-                  $cnt, scalar @$tokens);
-    }
-  }
   return 1;
 }
 
@@ -998,7 +1154,8 @@ sub clear_database {
   my($self) = @_;
 
   # TODO
-  warn("bayes: you need to manually clear Redis database\n");
+  warn("bayes: note: assuming the database is empty; ".
+       "to manually clear a database: redis-cli -n <db-ind> FLUSHDB\n");
 
   return 1;
 }
@@ -1022,60 +1179,55 @@ sub dump_db_toks {
 
   my $atime = time;  # fake
 
-  # Sadly it's impossible to prevent Redis-module itself keeping all
-  # resulting keys in memory.
-  my @keys;
-
   # let's get past this terrible command as fast as possible
   # (ignoring $regex which makes no sense with SHA digests)
-  @keys = $r->keys('w:*');
-  dbg("bayes: fetched %d token keys", scalar @keys);
+  my $keys = $r->call('KEYS', 'w:*');
+  dbg("bayes: fetched %d token keys", scalar @$keys);
 
   # process tokens in chunks of 1000
-  for (my $i = 0; $i <= $#keys; $i += 1000) {
-    my $end = $i + 999 >= $#keys ? $#keys : $i + 999;
+  for (my $i = 0; $i <= $#$keys; $i += 1000) {
+    my $end = $i + 999 >= $#$keys ? $#$keys : $i + 999;
 
     my @tokensdata;
     if (! $self->{have_lua}) {  # no Lua, 3-times slower
 
       for (my $j = $i; $j <= $end; $j++) {
-        my $token = $keys[$j];
-        $r->hmget($token, 's', 'h', sub {
-          my($val, $error) = @_;
-          push(@tokensdata, [ substr($token,2), $val->[0]||0, $val->[1]||0 ])
-            if $val && @$val == 2;
-          1;
-        });
+        $r->b_call('HMGET', $keys->[$j], 's', 'h');
+      }
+      my $j = $i;
+      my $itemslist_ref = $r->b_results;
+      foreach my $item ( @$itemslist_ref ) {
+        my($s,$h) = @$item;
+        push(@tokensdata,
+             [ substr($keys->[$j],2), ($s||0)+0, ($h||0)+0 ])  if $s || $h;
+        $j++;
       }
-      $self->_wait_all_responses;
 
     } else {  # have_lua
 
       my $nonce = sprintf("%06x", rand(0xffffff));
-      my @tokens = map(substr($_,2), @keys[$i .. $end]);  # strip leading "w:"
-      my @results = $r->evalsha($self->{multi_hmget_script},
-                                scalar @tokens, @tokens, $nonce);
-      my $r_nonce = $results[1];
-      if (@results != 2) {
+      my @tokens = @{$keys}[$i .. $end];
+      my $result = $r->call('EVALSHA', $self->{multi_hmget_script},
+                            scalar @tokens, @tokens, $nonce);
+      my @items = split(' ', $result);
+      my $r_nonce = pop(@items);
+      if (!defined $r_nonce) {
         $self->disconnect;
-        die sprintf("bayes: dump_db_toks expected 2 results, got %d\n",
-                    scalar @results);
+        die "bayes: dump_db_toks received no results\n";
       } elsif ($r_nonce ne $nonce) {
-        # check for redis protocol falling out of step
+        # redis protocol error?
         $self->disconnect;
-        die sprintf("bayes: dump_db_toks nonce mismatch, expected %s, got %s\n",
+        die sprintf("bayes: dump_db_toks nonce mismatch, ".
+                    "expected %s, got %s\n",
                     $nonce, defined $r_nonce ? $r_nonce : 'UNDEF');
-      } else {
-        @results = split(' ', $results[0]);
-        if (@results != @tokens) {
-          $self->disconnect;
-          die sprintf("bayes: dump_db_toks got %d entries, expected %d\n",
-                         scalar @results, scalar @tokens);
-        }
-        @results = split(' ', $results[0]);
-        @tokensdata = map { my($s,$h) = split(m{/}, shift @results, 2);
-                            [ $_, $s||0, $h||0 ] } @tokens;
+      } elsif (@items != @tokens) {
+        $self->disconnect;
+        die sprintf("bayes: dump_db_toks got %d entries, expected %d\n",
+                       scalar @items, scalar @tokens);
       }
+      # stripping a leading "w:"
+      @tokensdata = map { my($s,$h) = split(m{/}, shift @items, 2);
+                          [ substr($_,2), ($s||0)+0, ($h||0)+0 ] } @tokens;
     }
 
     my $probabilities_ref =
@@ -1091,6 +1243,7 @@ sub dump_db_toks {
         or die "Error writing tokens: $!";
     }
   }
+  dbg("bayes: written token keys");
 
   $self->untie_db();
 
@@ -1119,84 +1272,73 @@ sub backup_database {
   print "v\t$vars[1]\tnum_spam\n";
   print "v\t$vars[2]\tnum_nonspam\n";
 
-  # Sadly it's impossible to prevent Redis-module itself keeping all
-  # resulting keys in memory.
-  my @keys;
-
   # let's get past this terrible command as fast as possible
-  @keys = $r->keys('w:*');
-  dbg("bayes: fetched %d token keys", scalar @keys);
+  my $keys = $r->call('KEYS', 'w:*');
+  dbg("bayes: fetched %d token keys", scalar @$keys);
 
   # process tokens in chunks of 1000
-  for (my $i = 0; $i <= $#keys; $i += 1000) {
-    my $end = $i + 999 >= $#keys ? $#keys : $i + 999;
+  for (my $i = 0; $i <= $#$keys; $i += 1000) {
+    my $end = $i + 999 >= $#$keys ? $#$keys : $i + 999;
 
     if (! $self->{have_lua}) {  # no Lua, slower
 
       for (my $j = $i; $j <= $end; $j++) {
-        my $token = $keys[$j];
-        $r->hmget($token, 's', 'h', sub {
-          my($values, $error) = @_;
-          return if !$values || @$values != 2;
-          return if !$values->[0] && !$values->[1];
-          my $encoded = unpack("H*", substr($token, 2));
-          printf("t\t%d\t%d\t%s\t%s\n",
-                 $values->[0]||0, $values->[1]||0, $atime, $encoded);
-          1;
-        });
+        $r->b_call('HMGET', $keys->[$j], 's', 'h');
+      }
+      my $j = $i;
+      my $itemslist_ref = $r->b_results;
+      foreach my $item ( @$itemslist_ref ) {
+        my $encoded = unpack("H*", substr($keys->[$j++], 2));
+        my($s,$h) = @$item;
+        printf("t\t%d\t%d\t%s\t%s\n",
+               $s||0, $h||0, $atime, $encoded)  if $s || $h;
       }
-      $self->_wait_all_responses;
 
     } else {  # have_lua
 
       my $nonce = sprintf("%06x", rand(0xffffff));
-      my @tokens = map(substr($_,2), @keys[$i .. $end]);  # strip leading "w:"
-      my @results = $r->evalsha($self->{multi_hmget_script},
-                                scalar @tokens, @tokens, $nonce);
-      my $r_nonce = $results[1];
-      if (@results != 2) {
+      my @tokens = @{$keys}[$i .. $end];
+      my $result = $r->call('EVALSHA', $self->{multi_hmget_script},
+                            scalar @tokens, @tokens, $nonce);
+      my @items = split(' ', $result);
+      my $r_nonce = pop(@items);
+      if (!defined $r_nonce) {
         $self->disconnect;
-        die sprintf("bayes: backup_database expected 2 results, got %d\n",
-                    scalar @results);
+        die "bayes: backup_database received no results\n";
       } elsif ($r_nonce ne $nonce) {
-        # check for redis protocol falling out of step
+        # redis protocol error?
         $self->disconnect;
         die sprintf("bayes: backup_database nonce mismatch, ".
                     "expected %s, got %s\n",
                     $nonce, defined $r_nonce ? $r_nonce : 'UNDEF');
-      } else {
-        @results = split(' ', $results[0]);
-        if (@results != @tokens) {
-          $self->disconnect;
-          die sprintf("bayes: backup_database got %d entries, expected %d\n",
-                         scalar @results, scalar @tokens);
-        }
-        foreach my $token (@tokens) {
-          my($s,$h) = split(m{/}, shift @results, 2);
-          next if !$s && !$h;
-          my $encoded = unpack("H*", $token);
-          printf("t\t%d\t%d\t%s\t%s\n", $s||0, $h||0, $atime, $encoded);
-        }
+      } elsif (@items != @tokens) {
+        $self->disconnect;
+        die sprintf("bayes: backup_database got %d entries, expected %d\n",
+                       scalar @items, scalar @tokens);
+      }
+      foreach my $token (@tokens) {
+        my($s,$h) = split(m{/}, shift @items, 2);
+        next if !$s && !$h;
+        my $encoded = unpack("H*", substr($token,2));  # strip leading "w:"
+        printf("t\t%d\t%d\t%s\t%s\n", $s||0, $h||0, $atime, $encoded);
       }
     }
   }
+  dbg("bayes: written token keys");
 
-  @keys = $r->keys('s:*');
-  dbg("bayes: fetched %d seen keys", scalar @keys);
+  $keys = $r->call('KEYS', 's:*');
+  dbg("bayes: fetched %d seen keys", scalar @$keys);
 
-  for (my $i = 0; $i <= $#keys; $i += 1000) {
-    my $end = $i + 999 >= $#keys ? $#keys : $i + 999;
-    my @t = @keys[$i .. $end];
-    my $v = $self->_mget(\@t);
-    if (!$v || !@$v) {
-      $self->disconnect;
-      die "bayes: seen fetch failed";
-    }
+  for (my $i = 0; $i <= $#$keys; $i += 1000) {
+    my $end = $i + 999 >= $#$keys ? $#$keys : $i + 999;
+    my @t = @{$keys}[$i .. $end];
+    my $v = $r->call('MGET', @t);
     for (my $i = 0; $i < @$v; $i++) {
       next unless defined $v->[$i];
       printf("s\t%s\t%s\n", $v->[$i], substr($t[$i], 2));
     }
   }
+  dbg("bayes: written seen keys");
 
   $self->untie_db();
 
@@ -1224,19 +1366,13 @@ sub restore_database {
     return 0;
   }
 
-  # This is the critical phase (moving sql around), so don't allow it
-  # to be interrupted.
-  #local $SIG{'INT'} = 'IGNORE';
-  #local $SIG{'HUP'} = 'IGNORE'
-  #  if !Mail::SpamAssassin::Util::am_running_on_windows();
-  #local $SIG{'TERM'} = 'IGNORE';
-
   unless ($self->clear_database()) {
     return 0;
   }
 
   return 0 unless $self->tie_db_writable;
   $self->connect if !$self->{connected};
+  my $r = $self->{redis};
 
   my $token_count = 0;
   my $db_version;
@@ -1264,6 +1400,8 @@ sub restore_database {
 
   my $curtime = time;
   my $q_cnt = 0;
+  my $token_ttl = $self->{expire_token};  # possibly undefined
+  my $seen_ttl  = $self->{expire_seen};   # possibly undefined
 
   for ($!=0; defined($line=<DUMPFILE>); $!=0) {
     chomp($line);
@@ -1282,7 +1420,7 @@ sub restore_database {
       $spam_count = 0 if $spam_count < 0;
       $ham_count = 0 if $ham_count < 0;
 
-      next if $spam_count == 0 && $ham_count == 0;
+      next if !$spam_count && !$ham_count;
 
       if ($db_version < 3) {
         # versions < 3 use plain text tokens, so we need to convert to hash
@@ -1292,16 +1430,24 @@ sub restore_database {
         $token = pack("H*",$token);
       }
       my $key = 'w:'.$token;
-      $self->_hincrby_p($key, 's', int $spam_count) if $spam_count > 0;
-      $self->_hincrby_p($key, 'h', int $ham_count)  if $ham_count  > 0;
-      $self->_expire_p($key, $self->{expire_token})
-        if defined $self->{expire_token};
+      $r->b_call('HINCRBY', $key, 's', int $spam_count) if $spam_count > 0;
+      $r->b_call('HINCRBY', $key, 'h', int $ham_count)  if $ham_count  > 0;
+
+      if ($token_ttl) {
+        # by introducing some randomness (ttl times a factor of 0.7 .. 1.7),
+        # we avoid auto-expiration of many tokens all at once,
+        # introducing an unnecessary load spike on a redis server
+        $r->b_call('EXPIRE', $key, int($token_ttl * (rand()+0.7)));
+      }
+
+      # collect response every now and then, ignoring results
+      $r->b_results  if ++$q_cnt % 1000 == 0;
 
-      $self->{redis}->wait_all_responses if ++$q_cnt % 10000 == 0;
       $token_count++;
+
     } elsif ($line =~ /^s\s+/) { # seen line
       my @parsed_line = split(/\s+/, $line, 3);
-      my $flag = $parsed_line[1];
+      my $flag  = $parsed_line[1];
       my $msgid = $parsed_line[2];
 
       unless ($flag eq 'h' || $flag eq 's') {
@@ -1314,10 +1460,19 @@ sub restore_database {
         next;
       }
 
-      $self->_set_p("s:$msgid", $flag, $self->{expire_seen});
-      $self->{redis}->wait_all_responses if ++$q_cnt % 10000 == 0;
-    }
-    elsif ($line =~ /^v\s+/) {     # variable line
+      if (!$seen_ttl) {
+        $r->b_call('SET', "s:$msgid", $flag);
+      } else {
+        # by introducing some randomness (ttl times a factor of 0.7 .. 1.7),
+        # we avoid auto-expiration of many 'seen' entries all at once,
+        # introducing an unnecessary load spike on a redis server
+        $r->b_call('SETEX', "s:$msgid", int($seen_ttl * (rand()+0.7)), $flag);
+      }
+
+      # collect response every now and then, ignoring results
+      $r->b_results  if ++$q_cnt % 1000 == 0;
+
+    } elsif ($line =~ /^v\s+/) {  # variable line
       my @parsed_line = split(/\s+/, $line, 3);
       my $value = $parsed_line[1] + 0;
       if ($parsed_line[2] eq 'num_spam') {
@@ -1327,19 +1482,20 @@ sub restore_database {
       } else {
         dbg("bayes: restore_database: skipping unknown line: $line");
       }
+
     } else {
       dbg("bayes: skipping unknown line: $line");
       next;
     }
   }
 
+  $r->b_results;  # collect any remaining response, ignoring results
+
   defined $line || $!==0  or
     $!==EBADF ? dbg("bayes: error reading dump file: $!")
       : die "error reading dump file: $!";
   close(DUMPFILE) or die "Can't close dump file: $!";
 
-  $self->{redis}->wait_all_responses;
-
   print STDERR "\n" if $showdots;
 
   if ($num_spam <= 0 && $num_ham <= 0) {
@@ -1351,7 +1507,8 @@ sub restore_database {
   }
 
   dbg("bayes: parsed $line_count lines");
-  dbg("bayes: created database with $token_count tokens based on $num_spam spam messages and $num_ham ham messages");
+  dbg("bayes: created database with $token_count tokens ".
+      "based on $num_spam spam messages and $num_ham ham messages");
 
   $self->untie_db();
 
@@ -1397,272 +1554,34 @@ sub db_writable {
 sub _define_lua_scripts {
   my $self = shift;
   dbg("bayes: defining Lua scripts");
+
   $self->connect if !$self->{connected};
   my $r = $self->{redis};
 
-  $self->{multi_hmget_script} = $r->script_load(<<'END');
+  $self->{multi_hmget_script} = $r->call('SCRIPT', 'LOAD', <<'END');
     local rcall = redis.call
-    local r = {}
     local nonce = ARGV[1]
+    local KEYS = KEYS
+    local r = {}
     for j = 1, #KEYS do
-      local sh = rcall("HMGET", "w:" .. KEYS[j], "s", "h")
+      local sh = rcall("HMGET", KEYS[j], "s", "h")
       -- returns counts as a list of spam/ham pairs, zeroes may be omitted
       local s, h = sh[1] or "0", sh[2] or "0"
       local pair
       if h == "0" then
         pair = s  -- just a spam field, possibly zero; a ham field omitted
       elseif s == "0" then
-        pair = "/" .. h  -- just a ham field, zero in spam suppressed
+        pair = "/" .. h  -- just a ham field, zero in a spam field suppressed
       else
         pair = s .. "/" .. h
       end
       r[#r+1] = pair
     end
+    r[#r+1] = nonce
     -- return counts as a single string, avoids overhead of multiresult parsing
-    return { table.concat(r," "), nonce }
+    return table.concat(r," ")
 END
-
-  $self->{multi_expire_script} = $r->script_load(<<'END');
-    local ttl = ARGV[1]
-    local rcall = redis.call
-    for j = 1, #KEYS do
-      rcall("EXPIRE", "w:" .. KEYS[j], ttl)
-    end
-    return #KEYS
-END
-
-  $self->{multi_hincrby} = $r->script_load(<<'END');
-    local s, h, ttl = ARGV[1], ARGV[2], ARGV[3]
-    local set_expire = ttl and tonumber(ttl) > 0
-    local rcall = redis.call
-    if tonumber(s) ~= 0 then
-      for j = 1, #KEYS do
-        local token = "w:" .. KEYS[j]
-        local cnt = rcall("HINCRBY", token, "s", s)
-        if cnt <= 0 then
-          rcall("HDEL", token, "s")
-        elseif set_expire then
-          rcall("EXPIRE", token, ttl)
-        end
-      end
-    end
-    if tonumber(h) ~= 0 then
-      for j = 1, #KEYS do
-        local token = "w:" .. KEYS[j]
-        local cnt = rcall("HINCRBY", token, "h", h)
-        if cnt <= 0 then
-          rcall("HDEL", token, "h")
-        elseif set_expire then
-          rcall("EXPIRE", token, ttl)
-        end
-      end
-    end
-    return #KEYS
-END
-
   1;
 }
 
-sub _get {
-  my ($self, $key) = @_;
-
-  my $value;
-
-  my $err = $self->{timer}->run_and_catch(sub {
-    $value = $self->{redis}->get($key);
-  });
-
-  if ($self->{timer}->timed_out()) {
-    $self->disconnect;
-    die("bayes: get timed out!");
-  }
-  elsif ($err) {
-    $err =~ s{ at /.*}{}s; # skip full trace
-    $self->disconnect;
-    die("bayes: get failed: $err");
-  }
-
-  return $value;
-}
-
-sub _mget {
-  my ($self, $keys) = @_;
-
-  my @values;
-
-  my $err = $self->{timer}->run_and_catch(sub {
-    @values = $self->{redis}->mget(@$keys);
-  });
-
-  if ($self->{timer}->timed_out()) {
-    $self->disconnect;
-    die("bayes: mget timed out!");
-  }
-  elsif ($err) {
-    $err =~ s{ at /.*}{}s; # skip full trace
-    $self->disconnect;
-    die("bayes: mget failed: $err");
-  }
-
-  return \@values;
-}
-
-sub _hmget {
-  my ($self, $key, @fields) = @_;
-
-  my $value;
-  my $err = $self->{timer}->run_and_catch(sub {
-    $value = $self->{redis}->hmget($key, @fields);
-  });
-
-  if ($self->{timer}->timed_out()) {
-    $self->disconnect;
-    die("bayes: hmget timed out!");
-  }
-  elsif ($err) {
-    $err =~ s{ at /.*}{}s; # skip full trace
-    $self->disconnect;
-    die("bayes: hmget failed: $err");
-  }
-
-  return $value;
-}
-
-sub _set {
-  my ($self, $key, $value, $expire) = @_;
-
-  my $err = $self->{timer}->run_and_catch(sub {
-    if (defined $expire) {
-      $self->{redis}->setex($key, $expire, $value);
-    } else {
-      $self->{redis}->set($key, $value);
-    }
-  });
-
-  if ($self->{timer}->timed_out()) {
-    $self->disconnect;
-    die("bayes: set timed out!");
-  }
-  elsif ($err) {
-    $err =~ s{ at /.*}{}s; # skip full trace
-    $self->disconnect;
-    die("bayes: set failed: $err");
-  }
-
-  return 1;
-}
-
-sub _hincrby {
-  my ($self, $key, $field, $incr) = @_;
-
-  my $err = $self->{timer}->run_and_catch(sub {
-    $self->{redis}->hincrby($key, $field, $incr);
-  });
-
-  if ($self->{timer}->timed_out()) {
-    $self->disconnect;
-    die("bayes: hincrby timed out!");
-  }
-  elsif ($err) {
-    $err =~ s{ at /.*}{}s; # skip full trace
-    $self->disconnect;
-    die("bayes: hincrby failed: $err");
-  }
-
-  return 1;
-}
-
-# Pipelined set, must call _wait_all_responses after
-sub _set_p {
-  my ($self, $key, $value, $expire) = @_;
-
-  if (defined $expire) {
-    $self->{redis}->setex($key, $expire, $value, sub {});
-  } else {
-    $self->{redis}->set($key, $value, sub {});
-  }
-
-  return 1;
-}
-
-# Pipelined hincrby, must call _wait_all_responses after
-sub _hincrby_p {
-  my ($self, $key, $field, $incr) = @_;
-
-  $self->{redis}->hincrby($key, $field, $incr, sub {});
-
-  return 1;
-}
-
-# Pipelined del, must call _wait_all_responses after
-sub _del_p {
-  my ($self, $key) = @_;
-
-  $self->{redis}->del($key, sub {});
-
-  return 1;
-}
-
-# Pipelined hdel, must call _wait_all_responses after
-sub _hdel_p {
-  my ($self, $key, $field) = @_;
-
-  $self->{redis}->hdel($key, $field, sub {});
-
-  return 1;
-}
-
-# Pipelined expire, must call _wait_all_responses after
-sub _expire_p {
-  my ($self, $key, $expire) = @_;
-
-  if (defined $expire) {
-    $self->{redis}->expire($key, $expire, sub {});
-  }
-
-  return 1;
-}
-
-sub _wait_all_responses {
-  my ($self) = @_;
-
-  my $err = $self->{timer}->run_and_catch(sub {
-    $self->{redis}->wait_all_responses;
-  });
-
-  if ($self->{timer}->timed_out()) {
-    $self->disconnect;
-    die sprintf("bayes: wait_all_responses timed out! called from line %s\n",
-                (caller)[2]);
-  }
-  elsif ($err) {
-    $err =~ s{ at /.*}{}s; # skip full trace
-    $self->disconnect;
-    die sprintf("bayes: wait_all_responses failed: %s, called from line %s\n",
-                $err, (caller)[2]);
-  }
-
-  return 1;
-}
-
-sub _del {
-  my ($self, $key) = @_;
-
-  my $err = $self->{timer}->run_and_catch(sub {
-    $self->{redis}->del($key);
-  });
-
-  if ($self->{timer}->timed_out()) {
-    $self->disconnect;
-    die("bayes: del timed out!");
-  }
-  elsif ($err) {
-    $err =~ s{ at /.*}{}s; # skip full trace
-    $self->disconnect;
-    die("bayes: del failed: $err");
-  }
-
-  return 1;
-}
-
 1;