You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spamassassin.apache.org by mm...@apache.org on 2013/09/05 20:32:33 UTC
svn commit: r1520380 -
/spamassassin/trunk/lib/Mail/SpamAssassin/BayesStore/Redis.pm
Author: mmartinec
Date: Thu Sep 5 18:32:32 2013
New Revision: 1520380
URL: http://svn.apache.org/r1520380
Log:
Bug 6972: BayesStore/Redis.pm: ditching CPAN module Redis gains speed, avoids its bugs, works on IPv6
Modified:
spamassassin/trunk/lib/Mail/SpamAssassin/BayesStore/Redis.pm
Modified: spamassassin/trunk/lib/Mail/SpamAssassin/BayesStore/Redis.pm
URL: http://svn.apache.org/viewvc/spamassassin/trunk/lib/Mail/SpamAssassin/BayesStore/Redis.pm?rev=1520380&r1=1520379&r2=1520380&view=diff
==============================================================================
--- spamassassin/trunk/lib/Mail/SpamAssassin/BayesStore/Redis.pm (original)
+++ spamassassin/trunk/lib/Mail/SpamAssassin/BayesStore/Redis.pm Thu Sep 5 18:32:32 2013
@@ -25,20 +25,38 @@ Mail::SpamAssassin::BayesStore::Redis -
This module implementes a Redis based bayesian storage module.
-A redis server with a Lua support (2.6 or higher) is strongly recommended
+A redis server with a Lua support (2.6 or higher) is recommended
for performance reasons.
The bayes_sql_dsn config variable has been hijacked for our purposes:
bayes_sql_dsn
- Optional config parameters sent as is to Redis->new().
- Example: server=localhost:6379;password=foo;reconnect=20
+ Optional config parameters affecting a connection to a redis server.
- By default encoding=undef is set as suggested by Redis module.
+ This is a semicolon-separated list of option=value pairs, where an option
+ can be: server, password, database. Unrecognized options are silently
+ ignored.
+
+ The 'server' option specifies a socket on which a redis server is
+ listening. It can be an absolute path of a Unix socket, or a host:port
+ pair, where a host can be an IPv4 or IPv6 address or a host name.
+ An IPv6 address must be enclosed in brackets, e.g. [::1]:6379
+ (IPv6 support in a redis server is available since version 2.8.0).
+ A default is to connect to an INET socket at 127.0.0.1, port 6379.
+
+ The value of a 'password' option is sent in an AUTH command to a redis
+ server on connecting if a server requests authentication. A password is
+ sent in plain text and a redis server only offers an optional rudimentary
+ authentication. To limit access to a redis server use its 'bind' option
+ to bind to a specific interface (typically to a loopback interface),
+ or use a host-based firewall.
+
+ The value of a 'database' option can be an non-negative (small) integer,
+ which is passed to a redis server with a SELECT command on connecting,
+ and chooses a sub-database index. A default database index is 0.
- To use non-default database id, use "database=x". This is not passed
- to new(), but specially handled to call Redis->select($id).
+ Example: server=localhost:6379;password=foo;database=2
bayes_token_ttl
@@ -54,17 +72,221 @@ The bayes_sql_dsn config variable has be
Expiry is done internally in Redis using *_ttl settings mentioned above,
but only if bayes_auto_expire is true (which is a default). This is
-why --force-expire etc does nothing and token counts and atime values
-are shown zero in statistics.
+why --force-expire etc does nothing, and token counts and atime values
+are shown as zero in statistics.
LIMITATIONS: Only global bayes storage is implemented, per-user bayes is
-not available. Dumping (sa-learn --backup) of a very large database may
-not be possible due to memory limitations and inefficient full database
-traversal mechanism. This backend storage module is new with SpamAssassin
-3.4.0 and may be revised in future versions as more experience is gained.
+not currently available. Dumping (sa-learn --backup, or --dump) of a huge
+database may not be possible if all keys do not fit into process memory.
=cut
+package Mail::SpamAssassin::BayesStore::TinyRedis;
+# Implements the new unified request protocol, introduced in Redis 1.2 .
+
+use strict;
+use re 'taint';
+use warnings;
+
+use Errno qw(EINTR EAGAIN EPIPE ENOTCONN ECONNRESET ECONNABORTED);
+use IO::Socket::UNIX;
+
+our $io_socket_module_name;
+BEGIN {
+ if (eval { require IO::Socket::IP }) {
+ $io_socket_module_name = 'IO::Socket::IP';
+ } elsif (eval { require IO::Socket::INET6 }) {
+ $io_socket_module_name = 'IO::Socket::INET6';
+ } elsif (eval { require IO::Socket::INET }) {
+ $io_socket_module_name = 'IO::Socket::INET';
+ }
+}
+
+sub new {
+ my($class, %args) = @_;
+ my $self = bless { args => {%args} }, $class;
+ my $outbuf = ''; $self->{outbuf} = \$outbuf;
+ $self->{batch_size} = 0;
+ $self->{server} = $args{server} || $args{sock} || '127.0.0.1:6379';
+ $self->{on_connect} = $args{on_connect};
+ return if !$self->connect;
+ $self;
+}
+
+sub DESTROY {
+ my $self = $_[0];
+ local($@, $!, $_);
+ undef $self->{sock};
+}
+
+sub disconnect {
+ my $self = $_[0];
+ local($@, $!);
+ undef $self->{sock};
+}
+
+sub connect {
+ my $self = $_[0];
+
+ $self->disconnect;
+ my $sock;
+ my $server = $self->{server};
+ if ($server =~ m{^/}) {
+ $sock = IO::Socket::UNIX->new(
+ Peer => $server, Type => SOCK_STREAM);
+ } else {
+ $sock = $io_socket_module_name->new(
+ PeerAddr => $server, Proto => 'tcp');
+ }
+ if ($sock) {
+ $self->{sock} = $sock;
+
+ $self->{sock_fd} = $sock->fileno; $self->{fd_mask} = '';
+ vec($self->{fd_mask}, $self->{sock_fd}, 1) = 1;
+
+ # an on_connect() callback must not use batched calls!
+ $self->{on_connect}->($self) if $self->{on_connect};
+ }
+ $sock;
+}
+
+# Receive, parse and return $cnt consecutive redis replies as a list.
+#
+sub _response {
+ my($self, $cnt) = @_;
+
+ my $sock = $self->{sock};
+ if (!$sock) {
+ $self->connect or die "Connect failed: $!";
+ $sock = $self->{sock};
+ };
+
+ my @list;
+
+ for (1 .. $cnt) {
+
+ my $result = <$sock>;
+ if (!defined $result) {
+ $self->disconnect;
+ die "Error reading from Redis server: $!";
+ }
+ chomp $result;
+ my $resp_type = substr($result, 0, 1, '');
+
+ if ($resp_type eq '$') { # bulk reply
+ if ($result < 0) {
+ push(@list, undef); # null bulk reply
+ } else {
+ my $data = ''; my $ofs = 0; my $len = $result + 2;
+ while ($len > 0) {
+ my $nbytes = read($sock, $data, $len, $ofs);
+ if (!$nbytes) {
+ $self->disconnect;
+ defined $nbytes or die "Error reading from Redis server: $!";
+ die "Redis server closed connection";
+ }
+ $ofs += $nbytes; $len -= $nbytes;
+ }
+ chomp $data;
+ push(@list, $data);
+ }
+
+ } elsif ($resp_type eq ':') { # integer reply
+ push(@list, 0+$result);
+
+ } elsif ($resp_type eq '+') { # status reply
+ push(@list, $result);
+
+ } elsif ($resp_type eq '*') { # multi-bulk reply
+ push(@list, $result < 0 ? undef : $self->_response(0+$result) );
+
+ } elsif ($resp_type eq '-') { # error reply
+ die "$result\n";
+
+ } else {
+ die "Unknown Redis reply: $resp_type ($result)";
+ }
+ }
+ \@list;
+}
+
+sub _write_buff {
+ my($self, $bufref) = @_;
+
+ if (!$self->{sock}) { $self->connect or die "Connect failed: $!" };
+ my $nwrite;
+ for (my $ofs = 0; $ofs < length($$bufref); $ofs += $nwrite) {
+ # to reliably detect a disconnect we need to check for an input event
+ # using a select; checking status of syswrite is not sufficient
+ my($rout, $wout, $inbuff); my $fd_mask = $self->{fd_mask};
+ my $nfound = select($rout=$fd_mask, $wout=$fd_mask, undef, undef);
+ defined $nfound && $nfound >= 0 or die "Select failed: $!";
+ if (vec($rout, $self->{sock_fd}, 1) &&
+ !sysread($self->{sock}, $inbuff, 1024)) {
+ # eof, try reconnecting
+ $self->connect or die "Connect failed: $!";
+ }
+ local $SIG{PIPE} = 'IGNORE'; # don't signal on a write to a widowed pipe
+ $nwrite = syswrite($self->{sock}, $$bufref, length($$bufref)-$ofs, $ofs);
+ next if defined $nwrite;
+ $nwrite = 0;
+ if ($! == EINTR || $! == EAGAIN) { # no big deal, try again
+ Time::HiRes::sleep(0.1); # slow down, just in case
+ } else {
+ $self->disconnect;
+ if ($! == ENOTCONN || $! == EPIPE ||
+ $! == ECONNRESET || $! == ECONNABORTED) {
+ $self->connect or die "Connect failed: $!";
+ } else {
+ die "Error writing to redis socket: $!";
+ }
+ }
+ }
+ 1;
+}
+
+# Send a redis command with arguments, returning a redis reply.
+#
+sub call {
+ my $self = shift;
+
+ my $buff = '*' . scalar(@_) . "\015\012";
+ $buff .= '$' . length($_) . "\015\012" . $_ . "\015\012" for @_;
+
+ $self->_write_buff(\$buff);
+ local($/) = "\015\012";
+ my $arr_ref = $self->_response(1);
+ $arr_ref && $arr_ref->[0];
+}
+
+# Append a redis command with arguments to a batch.
+#
+sub b_call {
+ my $self = shift;
+
+ my $bufref = $self->{outbuf};
+ $$bufref .= '*' . scalar(@_) . "\015\012";
+ $$bufref .= '$' . length($_) . "\015\012" . $_ . "\015\012" for @_;
+ ++ $self->{batch_size};
+}
+
+# Send a batch of commands, returning an arrayref of redis replies,
+# each array element corresponding to one command in a batch.
+#
+sub b_results {
+ my $self = $_[0];
+ my $batch_size = $self->{batch_size};
+ return if !$batch_size;
+ my $bufref = $self->{outbuf};
+ $self->_write_buff($bufref);
+ $$bufref = ''; $self->{batch_size} = 0;
+ local($/) = "\015\012";
+ $self->_response($batch_size);
+}
+
+1;
+
+
package Mail::SpamAssassin::BayesStore::Redis;
use strict;
@@ -90,10 +312,6 @@ BEGIN {
@ISA = qw( Mail::SpamAssassin::BayesStore );
}
-# Support for "SCRIPT LOAD" command is needed, provided by Redis version 1.954
-# Method on_connect() is available since Redis version 1.956 .
-use constant HAS_REDIS => eval { require Redis; Redis->VERSION(1.956) };
-
=head1 METHODS
=head2 new
@@ -112,22 +330,18 @@ sub new {
$class = ref($class) || $class;
my $self = $class->SUPER::new(@_);
- unless (HAS_REDIS) {
- dbg("bayes: unable to connect to database: Redis module not available");
- }
-
my $bconf = $self->{bayes}->{conf};
foreach (split(';', $bconf->{bayes_sql_dsn})) {
my ($a, $b) = split('=');
- unless (defined $b) {
+ if (!defined $b) {
warn("bayes: invalid bayes_sql_dsn config\n");
return;
- }
- if ($a eq 'database') {
+ } elsif ($a eq 'database') {
$self->{db_id} = $b;
- }
- else {
+ } elsif ($a eq 'password') {
+ $self->{password} = $b;
+ } else {
push @{$self->{redis_conf}}, $a => $b eq 'undef' ?
undef : untaint_var($b);
}
@@ -161,7 +375,7 @@ sub new {
sub disconnect {
my($self) = @_;
if ($self->{connected}) {
- local($@, $!, $_);
+ local($@, $!);
dbg("bayes: Redis disconnect");
$self->{connected} = 0; undef $self->{redis};
}
@@ -173,24 +387,25 @@ sub DESTROY {
$self->{connected} = 0; undef $self->{redis};
}
-# called from a Redis module on Redis->new and on automatic re-connect
+# Called from a Redis module on Redis->new and on automatic re-connect.
+# The on_connect() callback must not use batched calls!
sub on_connect {
my($self, $r) = @_;
-
- dbg("bayes: Redis on-connect");
-
- if ($self->{db_id}) { # defined and nonzero
- # work around a Redis module bug:
- # Select new database doesn't survive after reconnect.
- # https://github.com/melo/perl-redis/issues/38
- eval {
- $r->select($self->{db_id}); 1;
- } or do {
- $@ =~ s{\s+at /.*}{}s;
- $self->disconnect;
- die "Redis error during database select(): $@\n";
- };
- }
+ my $db_id = $self->{db_id} || 0;
+ dbg("bayes: Redis on-connect, db_id %d", $db_id);
+ eval {
+ $r->call('SELECT', $db_id) eq 'OK' ? 1 : 0;
+ } or do {
+ if ($@ =~ /\bNOAUTH\b/) {
+ defined $self->{password}
+ or die "Redis server requires authentication, no password provided";
+ $r->call('AUTH', $self->{password});
+ $r->call('SELECT', $db_id);
+ } else {
+ chomp $@; die "Redis error: $@";
+ }
+ };
+ $r->call('CLIENT', 'SETNAME', 'sa['.$$.']');
1;
}
@@ -201,24 +416,20 @@ sub connect {
undef $self->{redis}; # just in case
my $err = $self->{timer}->run_and_catch(sub {
- my $mypid = $self->{opened_from_pid} = $$;
+ $self->{opened_from_pid} = $$;
# will keep a persistent session open to a redis server
- $self->{redis} = Redis->new(
- name => 'sa[' . $mypid . ']',
+ $self->{redis} = Mail::SpamAssassin::BayesStore::TinyRedis->new(
@{$self->{redis_conf}},
- encoding => undef,
- on_connect => sub { my($r) = @_; $self->on_connect($r) },
+ on_connect => sub { $self->on_connect(@_) },
);
+ $self->{redis} or die "Error: $!";
});
if ($self->{timer}->timed_out()) {
- warn("bayes: Redis connection timed out!");
undef $self->{redis};
- return;
+ die "bayes: Redis connection timed out!";
} elsif ($err) {
- $err =~ s{ at /.*}{}s; # skip full trace
- warn("bayes: Redis connection failed: $err");
undef $self->{redis};
- return;
+ die "bayes: Redis failed: $err";
}
$self->{connected} = 1;
}
@@ -236,8 +447,6 @@ forking off child processes.
sub prefork_init {
my ($self) = @_;
- HAS_REDIS or return;
-
# Each child process must establish its own connection with a Redis server,
# re-using a common forked socket leads to serious trouble (garbled data).
#
@@ -265,21 +474,19 @@ This optional method is called in a chil
sub spamd_child_init {
my ($self) = @_;
- HAS_REDIS or return;
-
# Each child process must establish its own connection with a Redis server,
# re-using a common forked socket leads to serious trouble (garbled data).
#
# Just in case the parent master process did not call prefork_init() above,
# we try to silently renounce the use of existing cloned connection here.
- # As the prefork_init plugin callback has only been in introduced in
+ # As the prefork_init plugin callback has only been introduced in
# SpamAssassin 3.4.0, this situation can arrise in case of some third party
# software (or a pre-3.4.0 version of spamd) is somehow using this plugin.
# Better safe than sorry...
if ($self->{connected}) {
dbg("bayes: spamd_child_init, closing a parent's session ".
- "with a Redis server in a child process");
+ "to a Redis server in a child process");
$self->untie_db;
$self->disconnect; # just drop it, don't shut down parent's session
}
@@ -297,8 +504,6 @@ This method ensures that the database co
sub tie_db_readonly {
my($self) = @_;
- HAS_REDIS or return;
-
$self->{is_writable} = 0;
my $success;
if ($self->{connected}) {
@@ -324,8 +529,6 @@ begin using the database immediately.
sub tie_db_writable {
my($self) = @_;
- HAS_REDIS or return;
-
$self->{is_writable} = 0;
my $success;
if ($self->{connected}) {
@@ -353,9 +556,8 @@ the database immediately.
sub _open_db {
my($self) = @_;
- dbg("bayes: _open_db(%s); Redis %s",
- $self->{connected} ? 'already connected' : 'not yet connected',
- Redis->VERSION);
+ dbg("bayes: _open_db(%s)",
+ $self->{connected} ? 'already connected' : 'not yet connected');
if ($self->{connected}) {
$self->{is_officially_open} = 1;
@@ -363,31 +565,28 @@ sub _open_db {
}
$self->read_db_configs();
-
$self->connect;
- my $have_lua = $self->{have_lua};
- if (!$self->{redis_server_version}) {
- my $info = $self->{info} = $self->{redis}->info;
- if ($info) {
- $self->{redis_server_version} = $info->{redis_version};
- $have_lua = $self->{have_lua} = 1 if exists $info->{used_memory_lua};
-
- dbg("bayes: redis server version %s, memory used %.1f MiB%s",
- $info->{redis_version}, $info->{used_memory}/1024/1024,
- !$have_lua ? '' : ", Lua is available");
- }
- if (!$have_lua) {
- warn "bayes: Redis server does not support Lua, ".
- "upgrade or expect slower operation\n";
+ if (!defined $self->{redis_server_version}) {
+ my $info = $self->{info} = $self->{redis}->call("INFO");
+ if (defined $info) {
+ my $redis_mem; local $1;
+ $self->{redis_server_version} =
+ $info =~ /^redis_version:\s*(.*?)\r?$/m ? $1 : '';
+ $self->{have_lua} = $info =~ /^used_memory_lua:/m ? 1 : 0;
+ $redis_mem = $1 if $info =~ /^used_memory:\s*(.*?)\r?$/m;
+ dbg("bayes: redis server version %s, memory used %.1f MiB, Lua %s",
+ $self->{redis_server_version}, $redis_mem/1024/1024,
+ $self->{have_lua} ? 'is available' : 'is not available');
}
}
- $self->{db_version} = $self->_get('v:DB_VERSION');
+ $self->{db_version} = $self->{redis}->call('GET', 'v:DB_VERSION');
if (!$self->{db_version}) {
$self->{db_version} = $self->DB_VERSION;
- my $ret = $self->{redis}->mset('v:DB_VERSION', $self->{db_version},
+ my $ret = $self->{redis}->call('MSET',
+ 'v:DB_VERSION', $self->{db_version},
'v:NSPAM', 0,
'v:NHAM', 0,
'v:TOKEN_FORMAT', 2 );
@@ -403,7 +602,7 @@ sub _open_db {
warn("bayes: bayes db version $self->{db_version} not supported, aborting\n");
return 0;
}
- my $token_format = $self->_get('v:TOKEN_FORMAT') || 0;
+ my $token_format = $self->{redis}->call('GET', 'v:TOKEN_FORMAT') || 0;
if ($token_format < 2) {
warn("bayes: bayes old token format $token_format not supported, ".
"consider backup/restore or initialize a database\n");
@@ -411,7 +610,7 @@ sub _open_db {
}
}
- if ($have_lua && !defined $self->{multi_hmget_script}) {
+ if ($self->{have_lua} && !defined $self->{multi_hmget_script}) {
$self->_define_lua_scripts;
}
@@ -480,7 +679,7 @@ is not found.
sub seen_get {
my($self, $msgid) = @_;
- return $self->_get("s:$msgid");
+ return $self->{redis}->call('GET', "s:$msgid");
}
=head2 seen_put
@@ -496,7 +695,13 @@ of two values 's' for spam and 'h' for h
sub seen_put {
my($self, $msgid, $flag) = @_;
- $self->_set("s:$msgid", $flag, $self->{expire_seen});
+ my $r = $self->{redis};
+ if ($self->{expire_seen}) {
+ $r->call('SETEX', "s:$msgid", $self->{expire_seen}, $flag);
+ } else {
+ $r->call('SET', "s:$msgid", $flag);
+ }
+
return 1;
}
@@ -512,7 +717,7 @@ This method removes C<$msgid> from the d
sub seen_delete {
my($self, $msgid) = @_;
- $self->_del("s:$msgid");
+ $self->{redis}->call('DEL', "s:$msgid");
return 1;
}
@@ -549,8 +754,7 @@ sub get_storage_variables {
OLDEST_TOKEN_AGE DB_VERSION LAST_JOURNAL_SYNC
LAST_ATIME_DELTA LAST_EXPIRE_REDUCE NEWEST_TOKEN_AGE
TOKEN_FORMAT} if !@varnames;
- @varnames = map("v:$_", @varnames);
- my $values = $self->_mget(\@varnames);
+ my $values = $self->{redis}->call('MGET', map('v:'.$_, @varnames));
return if !$values;
return map(defined $_ ? $_ : 0, @$values);
}
@@ -563,9 +767,6 @@ Description:
This method determines if an expire is currently running and returns
the last time set.
-There can be multiple times, so we just pull the greatest (most recent)
-value.
-
=cut
sub get_running_expire_tok {
@@ -637,56 +838,53 @@ sub tok_get_all {
if (! $self->{have_lua} ) {
- foreach my $token (@_) {
- $r->hmget('w:'.$token, 's', 'h', sub {
- my($values, $error) = @_;
- return if !$values || @$values != 2;
- return if !$values->[0] && !$values->[1];
- push(@values, [$token, $values->[0]||0, $values->[1]||0, 0]);
- 1;
- });
+ $r->b_call('HMGET', 'w:'.$_, 's', 'h') for @_;
+ my $results = $r->b_results;
+
+ if (@$results != @_) {
+ $self->disconnect;
+ die sprintf("bayes: tok_get_all got %d entries, expected %d\n",
+ scalar @$results, scalar @_);
+ }
+ for my $j (0 .. $#$results) {
+ my($s,$h) = @{$results->[$j]};
+ push(@values, [$_[$j], ($s||0)+0, ($h||0)+0, 0]) if $s || $h;
}
- $self->_wait_all_responses;
- } else { # have Lua, faster
+ } else { # have Lua
# no need for cryptographical strength, just checking for protocol errors
my $nonce = sprintf("%06x", rand(0xffffff));
- my @results;
+ my $result;
eval {
- @results = $r->evalsha($self->{multi_hmget_script},
- scalar @_, @_, $nonce);
+ $result = $r->call('EVALSHA', $self->{multi_hmget_script},
+ scalar @_, map('w:'.$_, @_), $nonce);
1;
} or do { # Lua script probably not cached, define again and re-try
- if ($@ !~ /^\Q[evalsha] NOSCRIPT\E/) {
+ if ($@ !~ /^NOSCRIPT/) {
$self->disconnect;
die "bayes: Redis LUA error: $@\n";
}
$self->_define_lua_scripts;
- @results = $r->evalsha($self->{multi_hmget_script},
- scalar @_, @_, $nonce);
+ $result = $r->call('EVALSHA', $self->{multi_hmget_script},
+ scalar @_, map('w:'.$_, @_), $nonce);
};
- my $r_nonce = $results[1];
- if (@results != 2) {
- $self->disconnect;
- die sprintf("bayes: tok_get_all expected 2 results, got %d\n",
- scalar @results);
- } elsif ($r_nonce ne $nonce) {
- # check for redis protocol falling out of step
+ my @items = split(' ', $result);
+ my $r_nonce = pop(@items);
+ if ($r_nonce ne $nonce) {
+ # redis protocol error?
$self->disconnect;
die sprintf("bayes: tok_get_all nonce mismatch, expected %s, got %s\n",
$nonce, defined $r_nonce ? $r_nonce : 'UNDEF');
+ } elsif (@items != @_) {
+ $self->disconnect;
+ die sprintf("bayes: tok_get_all got %d entries, expected %d\n",
+ scalar @items, scalar @_);
} else {
- @results = split(' ', $results[0]);
- if (@results != @_) {
- $self->disconnect;
- die sprintf("bayes: tok_get_all got %d entries, expected %d\n",
- scalar @results, scalar @_);
- }
- foreach my $token (@_) {
- my($s,$h) = split(m{/}, shift @results, 2);
- push(@values, [$token, ($s||0)+0, ($h||0)+0, 0]) if $s || $h;
+ for my $j (0 .. $#items) {
+ my($s,$h) = split(m{/}, $items[$j], 2);
+ push(@values, [$_[$j], ($s||0)+0, ($h||0)+0, 0]) if $s || $h;
}
}
}
@@ -734,69 +932,43 @@ sub multi_tok_count_change {
$dham ||= 0;
# the increment must be an integer, otherwise redis returns an error
- my $ttl = $self->{expire_token}; # time-to-live, in seconds
-
dbg("bayes: multi_tok_count_change learning %d spam, %d ham",
$dspam, $dham);
- $self->connect if !$self->{connected};
-
- if ($self->{have_lua}) {
-
- my $r = $self->{redis};
- my @tokens_list = keys %$tokens;
- my $ntokens = scalar @tokens_list;
- my $cnt;
- eval {
- $cnt = $r->evalsha($self->{multi_hincrby},
- $ntokens, @tokens_list, $dspam, $dham, $ttl);
- 1;
- } or do { # Lua script probably not cached, define again and re-try
- if ($@ !~ /^\Q[evalsha] NOSCRIPT\E/) {
- $self->disconnect;
- die "bayes: Redis LUA error: $@\n";
- }
- $self->_define_lua_scripts;
- $cnt = $r->evalsha($self->{multi_hincrby},
- $ntokens, @tokens_list, $dspam, $dham, $ttl);
- };
- if ($cnt != $ntokens) {
- $self->disconnect;
- die sprintf("bayes: multi_tok_count_change got %d, expected %d\n",
- $cnt, $ntokens);
- }
+ my $ttl = $self->{expire_token}; # time-to-live, in seconds
- } else { # no Lua, slower
+ $self->connect if !$self->{connected};
+ my $r = $self->{redis};
- if ($dspam > 0 || $dham > 0) { # learning
- while (my($token,$v) = each(%$tokens)) {
- $self->_hincrby_p('w:'.$token, 's', $dspam) if $dspam > 0;
- $self->_hincrby_p('w:'.$token, 'h', $dham) if $dham > 0;
- $self->_expire_p('w:'.$token, $ttl) if $ttl;
- }
- $self->_wait_all_responses;
+ if ($dspam > 0 || $dham > 0) { # learning
+ while (my($token,$v) = each(%$tokens)) {
+ my $key = 'w:'.$token;
+ $r->b_call('HINCRBY', $key, 's', int $dspam) if $dspam > 0;
+ $r->b_call('HINCRBY', $key, 'h', int $dham) if $dham > 0;
+ $r->b_call('EXPIRE', $key, $ttl) if $ttl;
}
+ $r->b_results; # collect response, ignoring results
+ }
- if ($dspam < 0 || $dham < 0) { # unlearning - rare, not as efficient
- while (my($token,$v) = each(%$tokens)) {
- if ($dspam < 0) {
- my $result = $self->_hincrby('w:'.$token, 's', int $dspam);
- if (!$result || $result <= 0) {
- $self->_hdel_p('w:'.$token, 's');
- } elsif ($ttl) {
- $self->_expire_p('w:'.$token, $ttl);
- }
+ if ($dspam < 0 || $dham < 0) { # unlearning - rare, not as efficient
+ while (my($token,$v) = each(%$tokens)) {
+ my $key = 'w:'.$token;
+ if ($dspam < 0) {
+ my $result = $r->call('HINCRBY', $key, 's', int $dspam);
+ if (!$result || $result <= 0) {
+ $r->call('HDEL', $key, 's');
+ } elsif ($ttl) {
+ $r->call('EXPIRE', $key, $ttl);
}
- if ($dham < 0) {
- my $result = $self->_hincrby('w:'.$token, 'h', int $dham);
- if (!$result || $result <= 0) {
- $self->_hdel_p('w:'.$token, 'h');
- } elsif ($ttl) {
- $self->_expire_p('w:'.$token, $ttl);
- }
+ }
+ if ($dham < 0) {
+ my $result = $r->call('HINCRBY', $key, 'h', int $dham);
+ if (!$result || $result <= 0) {
+ $r->call('HDEL', $key, 'h');
+ } elsif ($ttl) {
+ $r->call('EXPIRE', $key, $ttl);
}
}
- $self->_wait_all_responses;
}
}
@@ -837,10 +1009,12 @@ sub nspam_nham_change {
return 1 unless $ds || $dh;
$self->connect if !$self->{connected};
+ my $r = $self->{redis};
my $err = $self->{timer}->run_and_catch(sub {
- $self->{redis}->incrby("v:NSPAM", $ds) if $ds;
- $self->{redis}->incrby("v:NHAM", $dh) if $dh;
+ $r->b_call('INCRBY', "v:NSPAM", $ds) if $ds;
+ $r->b_call('INCRBY', "v:NHAM", $dh) if $dh;
+ $r->b_results; # collect response, ignoring results
});
if ($self->{timer}->timed_out()) {
@@ -848,7 +1022,6 @@ sub nspam_nham_change {
die("bayes: Redis connection timed out!");
}
elsif ($err) {
- $err =~ s{ at /.*}{}s; # skip full trace
$self->disconnect;
die("bayes: failed to increment nspam $ds nham $dh: $err");
}
@@ -891,42 +1064,25 @@ sub tok_touch_all {
my($self, $tokens, $newatime) = @_;
my $ttl = $self->{expire_token}; # time-to-live, in seconds
- return 1 unless defined $ttl;
+ return 1 unless $ttl && $tokens && @$tokens;
dbg("bayes: tok_touch_all setting expire to %s on %d tokens",
$ttl, scalar @$tokens);
$self->connect if !$self->{connected};
+ my $r = $self->{redis};
- # We just refresh TTL on all
- if (! $self->{have_lua} ) {
+ # Benchmarks for a 'with-Lua' vs. a 'batched non-Lua' case show same speed,
+ # so for simplicity we only kept a batched non-Lua code. Note that this
+ # only applies to our own implementation of the Redis client protocol
+ # which offers efficient command batching (pipelining) - with the Redis
+ # CPAN module the batched case would be worse by about 33% on the average.
- $self->_expire_p("w:$_", $ttl) for @$tokens;
- $self->_wait_all_responses;
+ # We just refresh TTL on all
- } else { # have Lua, faster
+ $r->b_call('EXPIRE', 'w:'.$_, $ttl) for @$tokens;
+ $r->b_results; # collect response, ignoring results
- my $r = $self->{redis};
- my $cnt;
- eval {
- $cnt = $r->evalsha($self->{multi_expire_script},
- scalar @$tokens, @$tokens, $ttl);
- 1;
- } or do { # Lua script probably not cached, define again and re-try
- if ($@ !~ /^\Q[evalsha] NOSCRIPT\E/) {
- $self->disconnect;
- die "bayes: Redis LUA error: $@\n";
- }
- $self->_define_lua_scripts;
- $cnt = $r->evalsha($self->{multi_expire_script},
- scalar @$tokens, @$tokens, $ttl);
- };
- if ($cnt != @$tokens) {
- $self->disconnect;
- die sprintf("bayes: tok_touch_all got %d, expected %d\n",
- $cnt, scalar @$tokens);
- }
- }
return 1;
}
@@ -998,7 +1154,8 @@ sub clear_database {
my($self) = @_;
# TODO
- warn("bayes: you need to manually clear Redis database\n");
+ warn("bayes: note: assuming the database is empty; ".
+ "to manually clear a database: redis-cli -n <db-ind> FLUSHDB\n");
return 1;
}
@@ -1022,60 +1179,55 @@ sub dump_db_toks {
my $atime = time; # fake
- # Sadly it's impossible to prevent Redis-module itself keeping all
- # resulting keys in memory.
- my @keys;
-
# let's get past this terrible command as fast as possible
# (ignoring $regex which makes no sense with SHA digests)
- @keys = $r->keys('w:*');
- dbg("bayes: fetched %d token keys", scalar @keys);
+ my $keys = $r->call('KEYS', 'w:*');
+ dbg("bayes: fetched %d token keys", scalar @$keys);
# process tokens in chunks of 1000
- for (my $i = 0; $i <= $#keys; $i += 1000) {
- my $end = $i + 999 >= $#keys ? $#keys : $i + 999;
+ for (my $i = 0; $i <= $#$keys; $i += 1000) {
+ my $end = $i + 999 >= $#$keys ? $#$keys : $i + 999;
my @tokensdata;
if (! $self->{have_lua}) { # no Lua, 3-times slower
for (my $j = $i; $j <= $end; $j++) {
- my $token = $keys[$j];
- $r->hmget($token, 's', 'h', sub {
- my($val, $error) = @_;
- push(@tokensdata, [ substr($token,2), $val->[0]||0, $val->[1]||0 ])
- if $val && @$val == 2;
- 1;
- });
+ $r->b_call('HMGET', $keys->[$j], 's', 'h');
+ }
+ my $j = $i;
+ my $itemslist_ref = $r->b_results;
+ foreach my $item ( @$itemslist_ref ) {
+ my($s,$h) = @$item;
+ push(@tokensdata,
+ [ substr($keys->[$j],2), ($s||0)+0, ($h||0)+0 ]) if $s || $h;
+ $j++;
}
- $self->_wait_all_responses;
} else { # have_lua
my $nonce = sprintf("%06x", rand(0xffffff));
- my @tokens = map(substr($_,2), @keys[$i .. $end]); # strip leading "w:"
- my @results = $r->evalsha($self->{multi_hmget_script},
- scalar @tokens, @tokens, $nonce);
- my $r_nonce = $results[1];
- if (@results != 2) {
+ my @tokens = @{$keys}[$i .. $end];
+ my $result = $r->call('EVALSHA', $self->{multi_hmget_script},
+ scalar @tokens, @tokens, $nonce);
+ my @items = split(' ', $result);
+ my $r_nonce = pop(@items);
+ if (!defined $r_nonce) {
$self->disconnect;
- die sprintf("bayes: dump_db_toks expected 2 results, got %d\n",
- scalar @results);
+ die "bayes: dump_db_toks received no results\n";
} elsif ($r_nonce ne $nonce) {
- # check for redis protocol falling out of step
+ # redis protocol error?
$self->disconnect;
- die sprintf("bayes: dump_db_toks nonce mismatch, expected %s, got %s\n",
+ die sprintf("bayes: dump_db_toks nonce mismatch, ".
+ "expected %s, got %s\n",
$nonce, defined $r_nonce ? $r_nonce : 'UNDEF');
- } else {
- @results = split(' ', $results[0]);
- if (@results != @tokens) {
- $self->disconnect;
- die sprintf("bayes: dump_db_toks got %d entries, expected %d\n",
- scalar @results, scalar @tokens);
- }
- @results = split(' ', $results[0]);
- @tokensdata = map { my($s,$h) = split(m{/}, shift @results, 2);
- [ $_, $s||0, $h||0 ] } @tokens;
+ } elsif (@items != @tokens) {
+ $self->disconnect;
+ die sprintf("bayes: dump_db_toks got %d entries, expected %d\n",
+ scalar @items, scalar @tokens);
}
+ # stripping a leading "w:"
+ @tokensdata = map { my($s,$h) = split(m{/}, shift @items, 2);
+ [ substr($_,2), ($s||0)+0, ($h||0)+0 ] } @tokens;
}
my $probabilities_ref =
@@ -1091,6 +1243,7 @@ sub dump_db_toks {
or die "Error writing tokens: $!";
}
}
+ dbg("bayes: written token keys");
$self->untie_db();
@@ -1119,84 +1272,73 @@ sub backup_database {
print "v\t$vars[1]\tnum_spam\n";
print "v\t$vars[2]\tnum_nonspam\n";
- # Sadly it's impossible to prevent Redis-module itself keeping all
- # resulting keys in memory.
- my @keys;
-
# let's get past this terrible command as fast as possible
- @keys = $r->keys('w:*');
- dbg("bayes: fetched %d token keys", scalar @keys);
+ my $keys = $r->call('KEYS', 'w:*');
+ dbg("bayes: fetched %d token keys", scalar @$keys);
# process tokens in chunks of 1000
- for (my $i = 0; $i <= $#keys; $i += 1000) {
- my $end = $i + 999 >= $#keys ? $#keys : $i + 999;
+ for (my $i = 0; $i <= $#$keys; $i += 1000) {
+ my $end = $i + 999 >= $#$keys ? $#$keys : $i + 999;
if (! $self->{have_lua}) { # no Lua, slower
for (my $j = $i; $j <= $end; $j++) {
- my $token = $keys[$j];
- $r->hmget($token, 's', 'h', sub {
- my($values, $error) = @_;
- return if !$values || @$values != 2;
- return if !$values->[0] && !$values->[1];
- my $encoded = unpack("H*", substr($token, 2));
- printf("t\t%d\t%d\t%s\t%s\n",
- $values->[0]||0, $values->[1]||0, $atime, $encoded);
- 1;
- });
+ $r->b_call('HMGET', $keys->[$j], 's', 'h');
+ }
+ my $j = $i;
+ my $itemslist_ref = $r->b_results;
+ foreach my $item ( @$itemslist_ref ) {
+ my $encoded = unpack("H*", substr($keys->[$j++], 2));
+ my($s,$h) = @$item;
+ printf("t\t%d\t%d\t%s\t%s\n",
+ $s||0, $h||0, $atime, $encoded) if $s || $h;
}
- $self->_wait_all_responses;
} else { # have_lua
my $nonce = sprintf("%06x", rand(0xffffff));
- my @tokens = map(substr($_,2), @keys[$i .. $end]); # strip leading "w:"
- my @results = $r->evalsha($self->{multi_hmget_script},
- scalar @tokens, @tokens, $nonce);
- my $r_nonce = $results[1];
- if (@results != 2) {
+ my @tokens = @{$keys}[$i .. $end];
+ my $result = $r->call('EVALSHA', $self->{multi_hmget_script},
+ scalar @tokens, @tokens, $nonce);
+ my @items = split(' ', $result);
+ my $r_nonce = pop(@items);
+ if (!defined $r_nonce) {
$self->disconnect;
- die sprintf("bayes: backup_database expected 2 results, got %d\n",
- scalar @results);
+ die "bayes: backup_database received no results\n";
} elsif ($r_nonce ne $nonce) {
- # check for redis protocol falling out of step
+ # redis protocol error?
$self->disconnect;
die sprintf("bayes: backup_database nonce mismatch, ".
"expected %s, got %s\n",
$nonce, defined $r_nonce ? $r_nonce : 'UNDEF');
- } else {
- @results = split(' ', $results[0]);
- if (@results != @tokens) {
- $self->disconnect;
- die sprintf("bayes: backup_database got %d entries, expected %d\n",
- scalar @results, scalar @tokens);
- }
- foreach my $token (@tokens) {
- my($s,$h) = split(m{/}, shift @results, 2);
- next if !$s && !$h;
- my $encoded = unpack("H*", $token);
- printf("t\t%d\t%d\t%s\t%s\n", $s||0, $h||0, $atime, $encoded);
- }
+ } elsif (@items != @tokens) {
+ $self->disconnect;
+ die sprintf("bayes: backup_database got %d entries, expected %d\n",
+ scalar @items, scalar @tokens);
+ }
+ foreach my $token (@tokens) {
+ my($s,$h) = split(m{/}, shift @items, 2);
+ next if !$s && !$h;
+ my $encoded = unpack("H*", substr($token,2)); # strip leading "w:"
+ printf("t\t%d\t%d\t%s\t%s\n", $s||0, $h||0, $atime, $encoded);
}
}
}
+ dbg("bayes: written token keys");
- @keys = $r->keys('s:*');
- dbg("bayes: fetched %d seen keys", scalar @keys);
+ $keys = $r->call('KEYS', 's:*');
+ dbg("bayes: fetched %d seen keys", scalar @$keys);
- for (my $i = 0; $i <= $#keys; $i += 1000) {
- my $end = $i + 999 >= $#keys ? $#keys : $i + 999;
- my @t = @keys[$i .. $end];
- my $v = $self->_mget(\@t);
- if (!$v || !@$v) {
- $self->disconnect;
- die "bayes: seen fetch failed";
- }
+ for (my $i = 0; $i <= $#$keys; $i += 1000) {
+ my $end = $i + 999 >= $#$keys ? $#$keys : $i + 999;
+ my @t = @{$keys}[$i .. $end];
+ my $v = $r->call('MGET', @t);
for (my $i = 0; $i < @$v; $i++) {
next unless defined $v->[$i];
printf("s\t%s\t%s\n", $v->[$i], substr($t[$i], 2));
}
}
+ dbg("bayes: written seen keys");
$self->untie_db();
@@ -1224,19 +1366,13 @@ sub restore_database {
return 0;
}
- # This is the critical phase (moving sql around), so don't allow it
- # to be interrupted.
- #local $SIG{'INT'} = 'IGNORE';
- #local $SIG{'HUP'} = 'IGNORE'
- # if !Mail::SpamAssassin::Util::am_running_on_windows();
- #local $SIG{'TERM'} = 'IGNORE';
-
unless ($self->clear_database()) {
return 0;
}
return 0 unless $self->tie_db_writable;
$self->connect if !$self->{connected};
+ my $r = $self->{redis};
my $token_count = 0;
my $db_version;
@@ -1264,6 +1400,8 @@ sub restore_database {
my $curtime = time;
my $q_cnt = 0;
+ my $token_ttl = $self->{expire_token}; # possibly undefined
+ my $seen_ttl = $self->{expire_seen}; # possibly undefined
for ($!=0; defined($line=<DUMPFILE>); $!=0) {
chomp($line);
@@ -1282,7 +1420,7 @@ sub restore_database {
$spam_count = 0 if $spam_count < 0;
$ham_count = 0 if $ham_count < 0;
- next if $spam_count == 0 && $ham_count == 0;
+ next if !$spam_count && !$ham_count;
if ($db_version < 3) {
# versions < 3 use plain text tokens, so we need to convert to hash
@@ -1292,16 +1430,24 @@ sub restore_database {
$token = pack("H*",$token);
}
my $key = 'w:'.$token;
- $self->_hincrby_p($key, 's', int $spam_count) if $spam_count > 0;
- $self->_hincrby_p($key, 'h', int $ham_count) if $ham_count > 0;
- $self->_expire_p($key, $self->{expire_token})
- if defined $self->{expire_token};
+ $r->b_call('HINCRBY', $key, 's', int $spam_count) if $spam_count > 0;
+ $r->b_call('HINCRBY', $key, 'h', int $ham_count) if $ham_count > 0;
+
+ if ($token_ttl) {
+ # by introducing some randomness (ttl times a factor of 0.7 .. 1.7),
+ # we avoid auto-expiration of many tokens all at once,
+ # introducing an unnecessary load spike on a redis server
+ $r->b_call('EXPIRE', $key, int($token_ttl * (rand()+0.7)));
+ }
+
+ # collect response every now and then, ignoring results
+ $r->b_results if ++$q_cnt % 1000 == 0;
- $self->{redis}->wait_all_responses if ++$q_cnt % 10000 == 0;
$token_count++;
+
} elsif ($line =~ /^s\s+/) { # seen line
my @parsed_line = split(/\s+/, $line, 3);
- my $flag = $parsed_line[1];
+ my $flag = $parsed_line[1];
my $msgid = $parsed_line[2];
unless ($flag eq 'h' || $flag eq 's') {
@@ -1314,10 +1460,19 @@ sub restore_database {
next;
}
- $self->_set_p("s:$msgid", $flag, $self->{expire_seen});
- $self->{redis}->wait_all_responses if ++$q_cnt % 10000 == 0;
- }
- elsif ($line =~ /^v\s+/) { # variable line
+ if (!$seen_ttl) {
+ $r->b_call('SET', "s:$msgid", $flag);
+ } else {
+ # by introducing some randomness (ttl times a factor of 0.7 .. 1.7),
+ # we avoid auto-expiration of many 'seen' entries all at once,
+ # introducing an unnecessary load spike on a redis server
+ $r->b_call('SETEX', "s:$msgid", int($seen_ttl * (rand()+0.7)), $flag);
+ }
+
+ # collect response every now and then, ignoring results
+ $r->b_results if ++$q_cnt % 1000 == 0;
+
+ } elsif ($line =~ /^v\s+/) { # variable line
my @parsed_line = split(/\s+/, $line, 3);
my $value = $parsed_line[1] + 0;
if ($parsed_line[2] eq 'num_spam') {
@@ -1327,19 +1482,20 @@ sub restore_database {
} else {
dbg("bayes: restore_database: skipping unknown line: $line");
}
+
} else {
dbg("bayes: skipping unknown line: $line");
next;
}
}
+ $r->b_results; # collect any remaining response, ignoring results
+
defined $line || $!==0 or
$!==EBADF ? dbg("bayes: error reading dump file: $!")
: die "error reading dump file: $!";
close(DUMPFILE) or die "Can't close dump file: $!";
- $self->{redis}->wait_all_responses;
-
print STDERR "\n" if $showdots;
if ($num_spam <= 0 && $num_ham <= 0) {
@@ -1351,7 +1507,8 @@ sub restore_database {
}
dbg("bayes: parsed $line_count lines");
- dbg("bayes: created database with $token_count tokens based on $num_spam spam messages and $num_ham ham messages");
+ dbg("bayes: created database with $token_count tokens ".
+ "based on $num_spam spam messages and $num_ham ham messages");
$self->untie_db();
@@ -1397,272 +1554,34 @@ sub db_writable {
sub _define_lua_scripts {
my $self = shift;
dbg("bayes: defining Lua scripts");
+
$self->connect if !$self->{connected};
my $r = $self->{redis};
- $self->{multi_hmget_script} = $r->script_load(<<'END');
+ $self->{multi_hmget_script} = $r->call('SCRIPT', 'LOAD', <<'END');
local rcall = redis.call
- local r = {}
local nonce = ARGV[1]
+ local KEYS = KEYS
+ local r = {}
for j = 1, #KEYS do
- local sh = rcall("HMGET", "w:" .. KEYS[j], "s", "h")
+ local sh = rcall("HMGET", KEYS[j], "s", "h")
-- returns counts as a list of spam/ham pairs, zeroes may be omitted
local s, h = sh[1] or "0", sh[2] or "0"
local pair
if h == "0" then
pair = s -- just a spam field, possibly zero; a ham field omitted
elseif s == "0" then
- pair = "/" .. h -- just a ham field, zero in spam suppressed
+ pair = "/" .. h -- just a ham field, zero in a spam field suppressed
else
pair = s .. "/" .. h
end
r[#r+1] = pair
end
+ r[#r+1] = nonce
-- return counts as a single string, avoids overhead of multiresult parsing
- return { table.concat(r," "), nonce }
+ return table.concat(r," ")
END
-
- $self->{multi_expire_script} = $r->script_load(<<'END');
- local ttl = ARGV[1]
- local rcall = redis.call
- for j = 1, #KEYS do
- rcall("EXPIRE", "w:" .. KEYS[j], ttl)
- end
- return #KEYS
-END
-
- $self->{multi_hincrby} = $r->script_load(<<'END');
- local s, h, ttl = ARGV[1], ARGV[2], ARGV[3]
- local set_expire = ttl and tonumber(ttl) > 0
- local rcall = redis.call
- if tonumber(s) ~= 0 then
- for j = 1, #KEYS do
- local token = "w:" .. KEYS[j]
- local cnt = rcall("HINCRBY", token, "s", s)
- if cnt <= 0 then
- rcall("HDEL", token, "s")
- elseif set_expire then
- rcall("EXPIRE", token, ttl)
- end
- end
- end
- if tonumber(h) ~= 0 then
- for j = 1, #KEYS do
- local token = "w:" .. KEYS[j]
- local cnt = rcall("HINCRBY", token, "h", h)
- if cnt <= 0 then
- rcall("HDEL", token, "h")
- elseif set_expire then
- rcall("EXPIRE", token, ttl)
- end
- end
- end
- return #KEYS
-END
-
1;
}
-sub _get {
- my ($self, $key) = @_;
-
- my $value;
-
- my $err = $self->{timer}->run_and_catch(sub {
- $value = $self->{redis}->get($key);
- });
-
- if ($self->{timer}->timed_out()) {
- $self->disconnect;
- die("bayes: get timed out!");
- }
- elsif ($err) {
- $err =~ s{ at /.*}{}s; # skip full trace
- $self->disconnect;
- die("bayes: get failed: $err");
- }
-
- return $value;
-}
-
-sub _mget {
- my ($self, $keys) = @_;
-
- my @values;
-
- my $err = $self->{timer}->run_and_catch(sub {
- @values = $self->{redis}->mget(@$keys);
- });
-
- if ($self->{timer}->timed_out()) {
- $self->disconnect;
- die("bayes: mget timed out!");
- }
- elsif ($err) {
- $err =~ s{ at /.*}{}s; # skip full trace
- $self->disconnect;
- die("bayes: mget failed: $err");
- }
-
- return \@values;
-}
-
-sub _hmget {
- my ($self, $key, @fields) = @_;
-
- my $value;
- my $err = $self->{timer}->run_and_catch(sub {
- $value = $self->{redis}->hmget($key, @fields);
- });
-
- if ($self->{timer}->timed_out()) {
- $self->disconnect;
- die("bayes: hmget timed out!");
- }
- elsif ($err) {
- $err =~ s{ at /.*}{}s; # skip full trace
- $self->disconnect;
- die("bayes: hmget failed: $err");
- }
-
- return $value;
-}
-
-sub _set {
- my ($self, $key, $value, $expire) = @_;
-
- my $err = $self->{timer}->run_and_catch(sub {
- if (defined $expire) {
- $self->{redis}->setex($key, $expire, $value);
- } else {
- $self->{redis}->set($key, $value);
- }
- });
-
- if ($self->{timer}->timed_out()) {
- $self->disconnect;
- die("bayes: set timed out!");
- }
- elsif ($err) {
- $err =~ s{ at /.*}{}s; # skip full trace
- $self->disconnect;
- die("bayes: set failed: $err");
- }
-
- return 1;
-}
-
-sub _hincrby {
- my ($self, $key, $field, $incr) = @_;
-
- my $err = $self->{timer}->run_and_catch(sub {
- $self->{redis}->hincrby($key, $field, $incr);
- });
-
- if ($self->{timer}->timed_out()) {
- $self->disconnect;
- die("bayes: hincrby timed out!");
- }
- elsif ($err) {
- $err =~ s{ at /.*}{}s; # skip full trace
- $self->disconnect;
- die("bayes: hincrby failed: $err");
- }
-
- return 1;
-}
-
-# Pipelined set, must call _wait_all_responses after
-sub _set_p {
- my ($self, $key, $value, $expire) = @_;
-
- if (defined $expire) {
- $self->{redis}->setex($key, $expire, $value, sub {});
- } else {
- $self->{redis}->set($key, $value, sub {});
- }
-
- return 1;
-}
-
-# Pipelined hincrby, must call _wait_all_responses after
-sub _hincrby_p {
- my ($self, $key, $field, $incr) = @_;
-
- $self->{redis}->hincrby($key, $field, $incr, sub {});
-
- return 1;
-}
-
-# Pipelined del, must call _wait_all_responses after
-sub _del_p {
- my ($self, $key) = @_;
-
- $self->{redis}->del($key, sub {});
-
- return 1;
-}
-
-# Pipelined hdel, must call _wait_all_responses after
-sub _hdel_p {
- my ($self, $key, $field) = @_;
-
- $self->{redis}->hdel($key, $field, sub {});
-
- return 1;
-}
-
-# Pipelined expire, must call _wait_all_responses after
-sub _expire_p {
- my ($self, $key, $expire) = @_;
-
- if (defined $expire) {
- $self->{redis}->expire($key, $expire, sub {});
- }
-
- return 1;
-}
-
-sub _wait_all_responses {
- my ($self) = @_;
-
- my $err = $self->{timer}->run_and_catch(sub {
- $self->{redis}->wait_all_responses;
- });
-
- if ($self->{timer}->timed_out()) {
- $self->disconnect;
- die sprintf("bayes: wait_all_responses timed out! called from line %s\n",
- (caller)[2]);
- }
- elsif ($err) {
- $err =~ s{ at /.*}{}s; # skip full trace
- $self->disconnect;
- die sprintf("bayes: wait_all_responses failed: %s, called from line %s\n",
- $err, (caller)[2]);
- }
-
- return 1;
-}
-
-sub _del {
- my ($self, $key) = @_;
-
- my $err = $self->{timer}->run_and_catch(sub {
- $self->{redis}->del($key);
- });
-
- if ($self->{timer}->timed_out()) {
- $self->disconnect;
- die("bayes: del timed out!");
- }
- elsif ($err) {
- $err =~ s{ at /.*}{}s; # skip full trace
- $self->disconnect;
- die("bayes: del failed: $err");
- }
-
- return 1;
-}
-
1;