You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spamassassin.apache.org by jm...@apache.org on 2005/06/01 06:06:55 UTC

svn commit: r179348 - /spamassassin/trunk/lib/Mail/SpamAssassin/SpamdForkScaling.pm /spamassassin/trunk/lib/Mail/SpamAssassin/SubProcBackChannel.pm

Author: jm
Date: Tue May 31 21:06:53 2005
New Revision: 179348

URL: http://svn.apache.org/viewcvs?rev=179348&view=rev
Log:
more robustness for prefork scaling: use non-blocking I/O internally, so that we can do timeouts, and implement loops for cases where partial writes/reads have occurred.  also add warnings for those two cases and debugs for nbio retries, for better debugging.

Modified:
    spamassassin/trunk/lib/Mail/SpamAssassin/SpamdForkScaling.pm
    spamassassin/trunk/lib/Mail/SpamAssassin/SubProcBackChannel.pm

Modified: spamassassin/trunk/lib/Mail/SpamAssassin/SpamdForkScaling.pm
URL: http://svn.apache.org/viewcvs/spamassassin/trunk/lib/Mail/SpamAssassin/SpamdForkScaling.pm?rev=179348&r1=179347&r2=179348&view=diff
==============================================================================
--- spamassassin/trunk/lib/Mail/SpamAssassin/SpamdForkScaling.pm (original)
+++ spamassassin/trunk/lib/Mail/SpamAssassin/SpamdForkScaling.pm Tue May 31 21:06:53 2005
@@ -21,6 +21,7 @@
 use strict;
 use warnings;
 use bytes;
+use Errno qw();
 
 use Mail::SpamAssassin::Util;
 use Mail::SpamAssassin::Logger;
@@ -49,6 +50,11 @@
 
 use constant PFORDER_ACCEPT      => 10;
 
+
+# timeout for a sysread() on the command channel.  if we go this long
+# without a message from the spamd parent or child, it's an error.
+use constant TOUT_READ_MAX       => 300;
+
 ###########################################################################
 
 sub new {
@@ -202,7 +208,8 @@
 
   # "I  b1 b2 b3 b4 \n " or "B  b1 b2 b3 b4 \n "
   my $line;
-  my $nbytes = $sock->sysread($line, 6);
+  my $nbytes = $self->sysread_with_timeout($sock, \$line, 6, TOUT_READ_MAX);
+
   if (!defined $nbytes || $nbytes == 0) {
     dbg("prefork: child closed connection");
 
@@ -266,7 +273,7 @@
       return $self->order_idle_child_to_accept();
     }
 
-    if (!$sock->syswrite ("A....\n"))
+    if (!$self->syswrite_with_retry($sock, "A....\n"))
     {
       # failure to write to the child; bad news.  call it dead
       warn "prefork: killing rogue child $kid, failed to write: $!\n";
@@ -314,7 +321,7 @@
   my ($self, $kid) = @_;
   if ($self->{waiting_for_idle_child}) {
     my $sock = $self->{backchannel}->get_socket_for_child($kid);
-    $sock->syswrite ("A....\n")
+    $self->syswrite_with_retry($sock, "A....\n")
         or die "prefork: $kid claimed it was ready, but write failed: $!";
     $self->{waiting_for_idle_child} = 0;
   }
@@ -343,7 +350,7 @@
 sub report_backchannel_socket {
   my ($self, $str) = @_;
   my $sock = $self->{backchannel}->get_parent_socket();
-  syswrite ($sock, $str)
+  $self->syswrite_with_retry($sock, $str)
         or write "syswrite() to parent failed: $!";
 }
 
@@ -354,7 +361,7 @@
   while (1) {
     # "A  .  .  .  .  \n "
     my $line;
-    my $nbytes = $sock->sysread($line, 6);
+    my $nbytes = $self->sysread_with_timeout($sock, \$line, 6, TOUT_READ_MAX);
     if (!defined $nbytes || $nbytes == 0) {
       if ($sock->eof()) {
         dbg("prefork: parent closed, exiting");
@@ -375,6 +382,118 @@
       die "prefork: unknown order from parent: '$line'";
     }
   }
+}
+
+###########################################################################
+
+sub sysread_with_timeout {
+  my ($self, $sock, $lineref, $toread, $timeout) = @_;
+
+  $$lineref = '';   # clear the output buffer
+  my $readsofar = 0;
+  my $deadline; # we only set this if the first read fails
+  my $buf;
+
+retry_read:
+  my $nbytes = $sock->sysread($buf, $toread);
+
+  if (!defined $nbytes) {
+    unless ((exists &Errno::EAGAIN && $! == &Errno::EAGAIN)
+        || (exists &Errno::EWOULDBLOCK && $! == &Errno::EWOULDBLOCK))
+    {
+      # an error that wasn't non-blocking I/O-related.  that's serious
+      return undef;
+    }
+
+    # ok, we didn't get it first time.  we'll have to start using
+    # select() and timeouts (which is slower).  Don't warn just yet,
+    # as it's quite acceptable in our design to have to "block" on
+    # sysread()s here.
+
+    my $now = time();
+    my $tout = $timeout;
+    if (!defined $deadline) {
+      # set this.  it'll be close enough ;)
+      $deadline = $now + $timeout;
+    }
+    elsif ($now > $deadline) {
+      # timed out!  report failure
+      warn "prefork: sysread(".$sock->fileno.") failed after $timeout secs";
+      return undef;
+    }
+    else {
+      $tout = $deadline - $now;     # the remaining timeout
+    }
+
+    dbg("prefork: sysread(".$sock->fileno.") not ready, wait max $tout secs");
+    my $rin = '';
+    vec($rin, $sock->fileno, 1) = 1;
+    select($rin, undef, undef, $tout);
+    goto retry_read;
+
+  }
+  elsif ($nbytes == 0) {        # EOF
+    return $readsofar;          # may be a partial read, or 0 for EOF
+
+  }
+  elsif ($nbytes == $toread) {  # a complete read, nice.
+    $readsofar += $nbytes;
+    $$lineref .= $buf;
+    return $readsofar;
+
+  }
+  else {
+    # we want to know about this.  this is not supposed to happen!
+    warn "prefork: partial read of $nbytes, toread=".$toread.
+            "sofar=".$readsofar." fd=".$sock->fileno.", recovering";
+    $readsofar += $nbytes;
+    $$lineref .= $buf;
+    $toread -= $nbytes;
+    goto retry_read;
+  }
+
+  die "assert: should not get here";
+}
+
+sub syswrite_with_retry {
+  my ($self, $sock, $buf) = @_;
+
+  my $written = 0;
+
+retry_write:
+  my $nbytes = $sock->syswrite($buf);
+  if (!defined $nbytes) {
+    unless ((exists &Errno::EAGAIN && $! == &Errno::EAGAIN)
+        || (exists &Errno::EWOULDBLOCK && $! == &Errno::EWOULDBLOCK))
+    {
+      # an error that wasn't non-blocking I/O-related.  that's serious
+      return undef;
+    }
+
+    warn "prefork: syswrite(".$sock->fileno.") failed, retrying...";
+
+    # give it 5 seconds to recover.  we retry indefinitely.
+    my $rout = '';
+    vec($rout, $sock->fileno, 1) = 1;
+    select(undef, $rout, undef, 5);
+
+    goto retry_write;
+  }
+  else {
+    $written += $nbytes;
+    $buf = substr($buf, $nbytes);
+
+    if ($buf eq '') {
+      return $written;      # it's complete, we can return
+    }
+    else {
+      warn "prefork: partial write of $nbytes, towrite=".length($buf).
+            " sofar=".$written." fd=".$sock->fileno.", recovering";
+      goto retry_write;
+    }
+  }
+
+  die "assert: should not get here";
 }
 
 ###########################################################################

Modified: spamassassin/trunk/lib/Mail/SpamAssassin/SubProcBackChannel.pm
URL: http://svn.apache.org/viewcvs/spamassassin/trunk/lib/Mail/SpamAssassin/SubProcBackChannel.pm?rev=179348&r1=179347&r2=179348&view=diff
==============================================================================
--- spamassassin/trunk/lib/Mail/SpamAssassin/SubProcBackChannel.pm (original)
+++ spamassassin/trunk/lib/Mail/SpamAssassin/SubProcBackChannel.pm Tue May 31 21:06:53 2005
@@ -72,6 +72,12 @@
   ($self->{latest_kid_fh}, $self->{parent}) =
             $io->socketpair(AF_UNIX,SOCK_STREAM,PF_UNSPEC)
             or die "backchannel: socketpair failed: $!";
+
+  # set those to use non-blocking I/O
+  $self->{parent}->blocking(0)
+            or die "backchannel: set non-blocking failed: $!";
+  $self->{latest_kid_fh}->blocking(0)
+            or die "backchannel: set non-blocking failed: $!";
 }
 
 sub setup_backchannel_parent_post_fork {