You are viewing a plain text version of this content. The canonical link for it is here.
Posted to axkit-dev@xml.apache.org by ma...@sergeant.org on 2006/08/21 23:26:05 UTC

[SVN] [106] Continuation support, and moved everything into a sort-of state machine

Revision: 106
Author:   matt
Date:     2006-08-21 21:25:46 +0000 (Mon, 21 Aug 2006)

Log Message:
-----------
Continuation support, and moved everything into a sort-of state machine

(needs a bit of a refactor, but for now it works)

Modified Paths:
--------------
    trunk/lib/AxKit2/Client.pm
    trunk/lib/AxKit2/Connection.pm
    trunk/lib/AxKit2/Constants.pm
    trunk/lib/AxKit2/Plugin.pm

Modified: trunk/lib/AxKit2/Client.pm
===================================================================
--- trunk/lib/AxKit2/Client.pm	2006-08-21 21:25:09 UTC (rev 105)
+++ trunk/lib/AxKit2/Client.pm	2006-08-21 21:25:46 UTC (rev 106)
@@ -97,120 +97,259 @@
     
     my $conf = $self->config();
     
-    my @r;
+    my @hooks;
   MAINLOOP:
     for my $plugin ($conf->plugins) {
         my $plug = plugin_instance($plugin) || next;
         for my $h ($plug->hooks($hook)) {
-            $self->log(LOGDEBUG, "$plugin running hook $hook") unless $hook eq 'logging';
-            eval { @r = $plug->$h($self, $conf, @_) };
-            if ($@) {
-                my $err = $@;
-                $self->log(LOGERROR, "FATAL PLUGIN ERROR: $err");
-                return SERVER_ERROR, $err;
-            }
-            next unless @r;
-            last MAINLOOP unless $r[0] == DECLINED;
+            push @hooks, [$plugin, $plug, $h];
         }
     }
+    
+    $self->_run_hooks($hook, [@_], \@hooks);
+}
+
+sub finish_continuation {
+    my ($self) = @_;
+    my $todo = $self->{continuation} || die "No continuation in progress";
+    $self->continue_read();
+    $self->{continuation} = undef;
+    my $hook = shift @$todo;
+    my $args = shift @$todo;
+    $self->_run_hooks($hook, $args, $todo);
+}
+
+sub _run_hooks {
+    my $self = shift;
+    my ($hook, $args, $todo) = @_;
+    
+    my $conf = $self->config();
+    
+    my @r;
+    while (@$todo) {
+        my $info = shift @$todo;
+        my ($plugin, $plug, $h) = @$info;
+        $self->log(LOGDEBUG, "$plugin running hook $hook") unless $hook eq 'logging';
+        eval { @r = $plug->$h($self, $conf, @$args) };
+        if ($@) {
+            my $err = $@;
+            $self->log(LOGERROR, "FATAL PLUGIN ERROR: $err");
+            return SERVER_ERROR, $err;
+        }
+        next unless @r;
+        if ($r[0] == CONTINUATION) {
+            $self->pause_read();
+            $self->{continuation} = [$hook, [@_], @$todo];
+        }
+        last unless $r[0] == DECLINED;
+    }
     $r[0] = DECLINED if not defined $r[0];
-    return @r;
+    if ($r[0] != CONTINUATION) {
+        my $responder = "hook_${hook}_end";
+        if (my $meth = $self->can($responder)) {
+            return $meth->($self, $r[0], $r[1], @$args);
+        }
+    }
 }
 
 sub log {
     my $self = shift;
-    my ($ret, $out) = $self->run_hooks('logging', @_);
+    $self->run_hooks('logging', @_);
 }
 
 sub hook_connect {
     my $self = shift;
-    my ($ret, $out) = $self->run_hooks('connect');
-    if ($ret == DECLINED) {
-        return 1;
+    $self->run_hooks('connect');
+}
+
+sub hook_connect_end {
+    my $self = shift;
+    my ($ret, $out) = @_;
+    if ($ret == DECLINED || $ret == OK) {
+        # success
+        $self->hook_pre_request;
     }
     else {
         # TODO: Output some stuff...
+        $self->close("connect hook closing");
         return;
     }
 }
 
-sub hook_uri_to_file {
+sub hook_pre_request {
     my $self = shift;
-    my ($ret, $out) = $self->run_hooks('uri_translation', @_);
-    if ($ret == DECLINED || $ret == OK) {
+    $self->run_hooks('pre_request');
+}
+
+sub hook_pre_request_end {
+    my $self = shift;
+    my ($ret, $out) = @_;
+    # TODO: Manage $ret
+    return;
+}
+
+sub hook_body_data {
+    my $self = shift;
+    $self->run_hooks('body_data', @_);
+}
+
+sub hook_body_data_end {
+    my ($self, $ret) = @_;
+    if ($ret == DECLINED) {
+        return;
+    }
+    if ($ret == DONE) {
+        $self->hook_uri_to_file();
+        return;
+    }
+    elsif ($ret == OK) {
         return 1;
     }
     else {
         # TODO: output error stuff?
+    }
+}
+
+sub hook_post_read_request {
+    my $self = shift;
+    $self->run_hooks('post_read_request', @_);
+}
+
+sub hook_post_read_request_end {
+    my ($self, $ret) = @_;
+    if ($ret == DECLINED || $ret == OK) {
+        if ($self->headers_in->request_method =~ /GET|HEAD/) {
+            return $self->process_request;
+        }
         return;
     }
+    else {
+        # TODO: Handle errors or other conditions
+    }
 }
 
+sub hook_uri_translation {
+    my ($self, $hd, $uri) = @_;
+    $self->run_hooks('uri_translation', $hd, $uri);
+}
+
+sub hook_uri_translation_end {
+    my ($self, $ret, $out, $hd) = @_;
+    if ($ret == DECLINED || $ret == OK) {
+        return $self->hook_mime_map($hd, $hd->filename);
+    }
+    else {
+        # TODO: output error stuff?
+        return;
+    }
+}
+
+sub hook_mime_map {
+    my $self = shift;
+    $self->run_hooks('mime_map', @_);
+}
+
+sub hook_mime_map_end {
+    my ($self, $ret, $out, $hd) = @_;
+    if ($ret == DECLINED || $ret == OK) {
+        return $self->hook_access_control($hd);
+    }
+    else {
+        # TODO: output error stuff?
+    }
+}
+
 sub hook_access_control {
-    1;
+    my $self = shift;
+    $self->run_hooks('access_control', @_);
 }
 
+sub hook_access_control_end {
+    my ($self, $ret, $out, $hd) = @_;
+    if ($ret == DECLINED || $ret == OK) {
+        return $self->hook_authentication($hd);
+    }
+    else {
+        # TODO: output error stuff?
+    }
+}
+
 sub hook_authentication {
-    1;
+    my $self = shift;
+    $self->run_hooks('authentication', @_);
 }
 
+sub hook_authentication_end {
+    my ($self, $ret, $out, $hd) = @_;
+    if ($ret == DECLINED || $ret == OK) {
+        return $self->hook_authorization($hd);
+    }
+    else {
+        # TODO: output error stuff?
+    }
+}
+
 sub hook_authorization {
-    1;
+    my $self = shift;
+    $self->run_hooks('authorization', @_);
 }
 
+sub hook_authorization_end {
+    my ($self, $ret, $out, $hd) = @_;
+    if ($ret == DECLINED || $ret == OK) {
+        return $self->hook_fixup($hd);
+    }
+    else {
+        # TODO: output error stuff?
+    }
+}
+
 sub hook_fixup {
-    1;
+    my $self = shift;
+    $self->run_hooks('fixup', @_);
 }
 
-sub hook_error {
-    my $self = shift;
-    $self->headers_out->code(SERVER_ERROR);
-    my ($ret) = $self->run_hooks('error', @_);
-    if ($ret != OK) {
-        $self->headers_out->header('Content-Type' => 'text/html; charset=UTF-8');
-        $self->send_http_headers;
-        $self->write(<<EOT);
-<!DOCTYPE HTML PUBLIC "-//IETF//DTD HTML 2.0//EN">
-<HTML><HEAD>
-<TITLE>500 Internal Server Error</TITLE>
-</HEAD><BODY>
-<H1>Internal Server Error</H1>
-The server encountered an internal error or
-misconfiguration and was unable to complete
-your request.<P>
-More information about this error may be available
-in the server error log.<P>
-<HR>
-</BODY></HTML>
-EOT
+sub hook_fixup_end {
+    my ($self, $ret, $out, $hd) = @_;
+    if ($ret == DECLINED || $ret == OK) {
+        return $self->hook_xmlresponse(AxKit2::Processor->new($self, $hd->filename), $hd);
     }
     else {
-        # we assume some hook handled the error
+        # TODO: output error stuff?
     }
 }
 
 sub hook_xmlresponse {
     my $self = shift;
-    my ($ret, $out) = $self->run_hooks('xmlresponse', @_);
+    $self->run_hooks('xmlresponse', @_);
+}
+
+sub hook_xmlresponse_end {
+    my ($self, $ret, $out, $input, $hd) = @_;
     if ($ret == DECLINED) {
-        return 0;
+        return $self->hook_response($hd);
     }
     elsif ($ret == OK) {
-        $out->output($self) if $out;
-        return 1; # stop
+        die "Cannot return OK without OUTPUT from xmlresponse" unless $out;
+        $out->output($self);
     }
     elsif ($ret == SERVER_ERROR) {
         $self->hook_error($out);
-        return 1; # stop
     }
     else {
         # TODO: handle errors
     }
+    
+    $self->write(sub { $self->http_response_sent() });
 }
 
 sub hook_response {
     my $self = shift;
-    my ($ret, $out) = $self->run_hooks('response', @_);
+    $self->run_hooks('response', @_);
+}
+
+sub hook_response_end {
+    my ($self, $ret, $out, $hd) = @_;
     if ($ret == DECLINED) {
         $self->headers_out->code(NOT_FOUND);
         $self->headers_out->header('Content-Type' => 'text/html; charset=UTF-8');
@@ -226,63 +365,66 @@
 <HR>
 </BODY></HTML>
 EOT
-        return;
     }
     elsif ($ret == OK) {
-        return 1;
+        # do nothing...
     }
     elsif ($ret == SERVER_ERROR) {
         $self->hook_error($out);
-        return 1; # stop
     }
     else {
         # TODO: output error stuff?
     }
+    
+    $self->write(sub { $self->hook_response_sent($self->headers_out->response_code) });
 }
 
-sub hook_body_data {
+sub hook_response_sent {
     my $self = shift;
-    my ($ret, $out) = $self->run_hooks('body_data', @_);
-    if ($ret == DECLINED) {
-        return;
-    }
+    $self->run_hooks('response_sent', @_);
+}
+
+sub hook_response_sent_end {
+    my ($self, $ret, $out, $code) = @_;
     if ($ret == DONE) {
-        $self->process_request();
-        return;
+        $self->close("plugin decided not to keep connection open");
     }
-    elsif ($ret == OK) {
-        return 1;
+    elsif ($ret == DECLINED || $ret == OK) {
+        return $self->http_response_sent;
     }
     else {
-        # TODO: output error stuff?
+        # TODO: errors?
     }
 }
 
-sub hook_mime_map {
+sub hook_error {
     my $self = shift;
-    my ($ret, $out) = $self->run_hooks('mime_map', @_);
-    if ($ret == DECLINED) {
-        return 1;
-    }
-    elsif ($ret == OK) {
-        return 1;
-    }
-    else {
-        # TODO: output error stuff?
-    }
+    $self->headers_out->code(SERVER_ERROR);
+    $self->run_hooks('error', @_);
 }
 
-sub hook_response_sent {
-    my $self = shift;
-    my ($ret, $out) = $self->run_hooks('response_sent', @_);
-    if ($ret == DONE) {
-        return 1;
+sub hook_error_end {
+    my ($self, $ret) = @_;
+    if ($ret != OK) {
+        $self->headers_out->header('Content-Type' => 'text/html; charset=UTF-8');
+        $self->send_http_headers;
+        $self->write(<<EOT);
+<!DOCTYPE HTML PUBLIC "-//IETF//DTD HTML 2.0//EN">
+<HTML><HEAD>
+<TITLE>500 Internal Server Error</TITLE>
+</HEAD><BODY>
+<H1>Internal Server Error</H1>
+The server encountered an internal error or
+misconfiguration and was unable to complete
+your request.<P>
+More information about this error may be available
+in the server error log.<P>
+<HR>
+</BODY></HTML>
+EOT
     }
-    elsif ($ret == DECLINED || $ret == OK) {
-        return;
-    }
     else {
-        # TODO: errors?
+        # we assume some hook handled the error
     }
 }
 

Modified: trunk/lib/AxKit2/Connection.pm
===================================================================
--- trunk/lib/AxKit2/Connection.pm	2006-08-21 21:25:09 UTC (rev 105)
+++ trunk/lib/AxKit2/Connection.pm	2006-08-21 21:25:46 UTC (rev 106)
@@ -35,15 +35,13 @@
     http_headers_sent
     notes
     sock_closed
+    pause_count
+    continuation
     );
 
 use constant CLEANUP_TIME => 5; # every N seconds
 use constant MAX_HTTP_HEADER_LENGTH => 102400; # 100k
 
-our $last_cleanup = 0;
-
-Danga::Socket->AddTimer(CLEANUP_TIME, \&_do_cleanup);
-
 sub new {
     my AxKit2::Connection $self = shift;
     my $sock = shift;
@@ -62,9 +60,9 @@
     $self->{notes} = {};
     
     $self->log(LOGINFO, "Connection from " . $self->peer_addr_string);
-    # allow connect hook to disconnect us
-    $self->hook_connect() or return;
     
+    $self->hook_connect();
+    
     return $self;
 }
 
@@ -74,6 +72,28 @@
     return (time() - $self->{create_time});
 }
 
+sub paused {
+    my AxKit2::Connection $self = shift;
+    return 1 if $self->{pause_count};
+    return 1 if $self->{closed};
+    return 0;
+}
+
+sub pause_read {
+    my AxKit2::Connection $self = shift;
+    $self->{pause_count}++;
+    $self->watch_read(0);
+}
+
+sub continue_read {
+    my AxKit2::Connection $self = shift;
+    $self->{pause_count}--;
+    if ($self->{pause_count} <= 0) {
+        $self->{pause_count} = 0;
+        $self->watch_read(1);
+    }
+}
+
 sub config {
     my AxKit2::Connection $self = shift;
     if ($self->{headers_in}) {
@@ -149,7 +169,7 @@
     
     $self->{ditch_leading_rn} = 0;
     
-    $self->process_request() if $self->{headers_in}->request_method =~ /GET|HEAD/;
+    $self->hook_post_read_request($self->{headers_in});
 }
 
 sub headers_out {
@@ -178,43 +198,13 @@
 sub process_request {
     my AxKit2::Connection $self = shift;
     my $hd = $self->{headers_in};
-    my $conf = $self->{server_config};
 
     $self->{headers_out} = AxKit2::HTTPHeaders->new_response;
     $self->{headers_out}->header(Date   => http_date());
     $self->{headers_out}->header(Server => "AxKit-2/v$AxKit2::VERSION");
     
-    $self->hook_uri_to_file($hd, $hd->request_uri)
-    &&
-    $self->hook_mime_map($hd, $hd->filename)
-    &&
-    $self->hook_access_control($hd)
-    &&
-    $self->hook_authentication($hd)
-    &&
-    $self->hook_authorization($hd)
-    &&
-    $self->hook_fixup($hd)
-    &&
-    (
-        $self->hook_xmlresponse(AxKit2::Processor->new($self, $hd->filename))
-        ||
-        $self->hook_response($hd)
-    );
-    
-    $self->write(sub { $self->http_response_sent() });
-
-#    use Devel::GC::Helper;
-#    use Data::Dumper;
-#    $Data::Dumper::Terse = 1;
-#    $Data::Dumper::Indent = 1;
-#    #$Data::Dumper::Deparse = 1;
-#    my $leaks = Devel::GC::Helper::sweep;
-#    foreach my $leak (@$leaks) {
-#        print "Leaked $leak\n";
-#        print Dumper($leak);
-#    }
-#    print "Total leaks: " . scalar(@$leaks) . "\n";                                                 
+    # This starts off the chain reaction of the main state machine
+    $self->hook_uri_translation($hd, $hd->request_uri);
 }
 
 # called when we've finished writing everything to a client and we need
@@ -223,11 +213,6 @@
 sub http_response_sent {
     my AxKit2::Connection $self = $_[0];
     
-    if ($self->hook_response_sent($self->{headers_out}->response_code)) {
-        $self->close("plugin");
-        return 0;
-    }
-    
     return 0 if $self->{sock_closed};
     
     # close if we're supposed to
@@ -269,6 +254,9 @@
     # pipeline in a read that we haven't read yet.
     $self->watch_read(1);
     $self->watch_write(0);
+    
+    $self->hook_pre_request();
+    
     return 1;
 }
 
@@ -276,6 +264,8 @@
 #    print "Connection DESTROY\n";
 }
 
+Danga::Socket->AddTimer(CLEANUP_TIME, \&_do_cleanup);
+
 # Cleanup routine to get rid of timed out sockets
 sub _do_cleanup {
     my $now = time;

Modified: trunk/lib/AxKit2/Constants.pm
===================================================================
--- trunk/lib/AxKit2/Constants.pm	2006-08-21 21:25:09 UTC (rev 105)
+++ trunk/lib/AxKit2/Constants.pm	2006-08-21 21:25:46 UTC (rev 106)
@@ -43,6 +43,7 @@
         SERVER_ERROR           => 500,
         DECLINED               => 909,
         DONE                   => 910,
+        CONTINUATION           => 911,
 );
 
 use vars qw(@ISA @EXPORT);

Modified: trunk/lib/AxKit2/Plugin.pm
===================================================================
--- trunk/lib/AxKit2/Plugin.pm	2006-08-21 21:25:09 UTC (rev 105)
+++ trunk/lib/AxKit2/Plugin.pm	2006-08-21 21:25:46 UTC (rev 106)
@@ -24,9 +24,9 @@
 # more or less in the order they will fire
 # DON'T FORGET - edit "AVAILABLE HOOKS" below.
 our @hooks = qw(
-    logging connect post_read_request body_data uri_translation access_control
-    authentication authorization mime_map xmlresponse response
-    response_sent disconnect error
+    logging connect pre_request post_read_request body_data uri_translation
+    mime_map access_control authentication authorization  fixup
+    xmlresponse response response_sent disconnect error
 );
 our %hooks = map { $_ => 1 } @hooks;