You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spamassassin.apache.org by jm...@apache.org on 2005/05/04 09:00:17 UTC

svn commit: r168094 - /spamassassin/trunk/lib/Mail/SpamAssassin/SpamdForkScaling.pm

Author: jm
Date: Wed May  4 00:00:16 2005
New Revision: 168094

URL: http://svn.apache.org/viewcvs?rev=168094&view=rev
Log:
bug 4258: fix spamd lockup on FreeBSD by moving to a fixed-buffer-size protocol between spamd parent and children.  This avoids a hang when two messages arrive in the same read(2) call, which gets buffered on FreeBSD

Modified:
    spamassassin/trunk/lib/Mail/SpamAssassin/SpamdForkScaling.pm

Modified: spamassassin/trunk/lib/Mail/SpamAssassin/SpamdForkScaling.pm
URL: http://svn.apache.org/viewcvs/spamassassin/trunk/lib/Mail/SpamAssassin/SpamdForkScaling.pm?rev=168094&r1=168093&r2=168094&view=diff
==============================================================================
--- spamassassin/trunk/lib/Mail/SpamAssassin/SpamdForkScaling.pm (original)
+++ spamassassin/trunk/lib/Mail/SpamAssassin/SpamdForkScaling.pm Wed May  4 00:00:16 2005
@@ -169,12 +169,12 @@
     return;
   }
 
+  # otherwise it's a status report from a child.
   foreach my $fh ($self->{backchannel}->select_vec_to_fh_list($rout))
   {
-    # otherwise it's a status report from a child.
     # just read one line.  if there's more lines, we'll get them
     # when we re-enter the can_read() select call above...
-    if ($self->read_one_line_from_child_socket($fh) == PFSTATE_IDLE)
+    if ($self->read_one_message_from_child_socket($fh) == PFSTATE_IDLE)
     {
       dbg("prefork: child reports idle");
       if ($self->{overloaded}) {
@@ -199,11 +199,13 @@
   $self->adapt_num_children();
 }
 
-sub read_one_line_from_child_socket {
+sub read_one_message_from_child_socket {
   my ($self, $sock) = @_;
 
-  my $line = $sock->getline();
-  if (!defined $line) {
+  # "I  b1 b2 b3 b4 \n " or "B  b1 b2 b3 b4 \n "
+  my $line;
+  my $nbytes = $sock->sysread($line, 6);
+  if (!defined $nbytes || $nbytes == 0) {
     dbg ("prefork: child closed connection");
 
     # stop it being select'd
@@ -215,14 +217,20 @@
 
     return PFSTATE_ERROR;
   }
+  if ($nbytes < 6) {
+    warn ("prefork: child gave short message: len=$nbytes bytes=".
+                    join(" ", unpack "C*", $line));
+  }
 
   chomp $line;
-  if ($line =~ /^I(\d+)/) {
-    $self->set_child_state ($1, PFSTATE_IDLE);
+  if ($line =~ s/^I//) {
+    my $pid = unpack("N1", $line);
+    $self->set_child_state ($pid, PFSTATE_IDLE);
     return PFSTATE_IDLE;
   }
-  elsif ($line =~ /^B(\d+)/) {
-    $self->set_child_state ($1, PFSTATE_BUSY);
+  elsif ($line =~ s/^B//) {
+    my $pid = unpack("N1", $line);
+    $self->set_child_state ($pid, PFSTATE_BUSY);
     return PFSTATE_BUSY;
   }
   else {
@@ -235,8 +243,11 @@
 
 # we use the following protocol between the master and child processes to
 # control when they accept/who accepts: server tells a child to accept with a
-# "A\n", child responds with "B$pid\n" when it's busy, and "I$pid\n" once it's
-# idle again.  Very simple, line-based protocol.
+# "A....\n", child responds with "B$pid\n" when it's busy, and "I$pid\n" once
+# it's idle again.  Very simple protocol.  Note that the $pid values are packed
+# into 4 bytes so that the buffers are always of a known length; if you need to
+# transfer longer data, assign a new protocol verb (the first char) and use the
+# length of the following data buffer as the packed value.
 
 sub order_idle_child_to_accept {
   my ($self) = @_;
@@ -245,7 +256,7 @@
   if (defined $kid)
   {
     my $sock = $self->{backchannel}->get_socket_for_child($kid);
-    if (!$sock->syswrite ("A\n"))
+    if (!$sock->syswrite ("A....\n"))
     {
       # failure to write to the child; bad news.  call it dead
       warn "prefork: killing rogue child $kid, failed to write: $!\n";
@@ -272,7 +283,7 @@
   my ($self, $sock) = @_;
 
   while (1) {
-    my $state = $self->read_one_line_from_child_socket($sock);
+    my $state = $self->read_one_message_from_child_socket($sock);
     if ($state == PFSTATE_BUSY) {
       return 1;     # 1 == success
     }
@@ -289,7 +300,7 @@
   my ($self, $kid) = @_;
   if ($self->{waiting_for_idle_child}) {
     my $sock = $self->{backchannel}->get_socket_for_child($kid);
-    $sock->syswrite ("A\n")
+    $sock->syswrite ("A....\n")
         or die "prefork: $kid claimed it was ready, but write failed: $!";
     $self->{waiting_for_idle_child} = 0;
   }
@@ -305,12 +316,14 @@
 
 sub update_child_status_idle {
   my ($self) = @_;
-  $self->report_backchannel_socket("I".$self->{pid}."\n");
+  # "I  b1 b2 b3 b4 \n "
+  $self->report_backchannel_socket("I".pack("N",$self->{pid})."\n");
 }
 
 sub update_child_status_busy {
   my ($self) = @_;
-  $self->report_backchannel_socket("B".$self->{pid}."\n");
+  # "B  b1 b2 b3 b4 \n "
+  $self->report_backchannel_socket("B".pack("N",$self->{pid})."\n");
 }
 
 sub report_backchannel_socket {
@@ -325,10 +338,17 @@
 
   my $sock = $self->{backchannel}->get_parent_socket();
   while (1) {
-    my $line = $sock->getline();
-    if (!defined($line)) {
+    # "A  .  .  .  .  \n "
+    my $line;
+    my $nbytes = $sock->sysread($line, 6);
+    if (!defined $nbytes || $nbytes == 0) {
       die "prefork: empty order from parent";
     }
+    if ($nbytes < 6) {
+      warn ("prefork: parent gave short message: len=$nbytes bytes=".
+                      join(" ", unpack "C*", $line));
+    }
+
     chomp $line;
     if (index ($line, "A") == 0) {  # string starts with "A" = accept
       return PFORDER_ACCEPT;