You are viewing a plain text version of this content. The canonical link for it is here.
Posted to axkit-dev@xml.apache.org by ma...@sergeant.org on 2006/08/13 04:19:46 UTC

[SVN] [80] Console server and various cleanups/fixes

Revision: 80
Author:   matt
Date:     2006-08-13 02:19:05 +0000 (Sun, 13 Aug 2006)

Log Message:
-----------
Console server and various cleanups/fixes

Modified Paths:
--------------
    trunk/lib/AxKit2/Config/Global.pm
    trunk/lib/AxKit2/Config.pm
    trunk/lib/AxKit2/Connection.pm
    trunk/lib/AxKit2/Processor.pm
    trunk/lib/AxKit2/Server.pm
    trunk/lib/AxKit2.pm

Added Paths:
-----------
    trunk/lib/AxKit2/Console.pm

Modified: trunk/lib/AxKit2/Config/Global.pm
===================================================================
--- trunk/lib/AxKit2/Config/Global.pm	2006-08-13 02:18:36 UTC (rev 79)
+++ trunk/lib/AxKit2/Config/Global.pm	2006-08-13 02:19:05 UTC (rev 80)
@@ -22,6 +22,18 @@
     $self->{DocumentRoot};
 }
 
+sub console_port {
+    my $self = shift;
+    @_ and $self->{ConsolePort} = shift;
+    $self->{ConsolePort};
+}
+
+sub console_addr {
+    my $self = shift;
+    @_ and $self->{ConsoleAddr} = shift;
+    $self->{ConsoleAddr};
+}
+
 sub styleroot {
     my $self = shift;
     @_ and $self->{StylesheetRoot} = shift;
@@ -58,4 +70,4 @@
     $self->{Notes}{$key};
 }
 
-1;
\ No newline at end of file
+1;

Modified: trunk/lib/AxKit2/Config.pm
===================================================================
--- trunk/lib/AxKit2/Config.pm	2006-08-13 02:18:36 UTC (rev 79)
+++ trunk/lib/AxKit2/Config.pm	2006-08-13 02:19:05 UTC (rev 80)
@@ -14,8 +14,9 @@
     Plugin => [\&TAKE1, sub { my $conf = shift; AxKit2::Client->load_plugin($conf, $_[0]); $conf->add_plugin($_[0]); }],
     Port   => [\&TAKE1, sub { my $conf = shift; $conf->port($_[0]) }],
     DocumentRoot => [\&TAKE1, sub { my $conf = shift; $conf->docroot($_[0]) }],
-    StylesheetRoot => [\&TAKE1, sub { my $conf = shift; $conf->styleroot($_[0]) }],
     DirectoryIndex => [\&TAKE1, sub { my $conf = shift; $conf->dirindex($_[0]) }],
+    ConsolePort => [\&TAKE1, sub { my $conf = shift; $conf->isa('AxKit2::Config::Global') || die "ConsolePort only allowed at global level"; $conf->console_port($_[0]) }],
+    ConsoleAddr => [\&TAKE1, sub { my $conf = shift; $conf->isa('AxKit2::Config::Global') || die "ConsoleAddr only allowed at global level"; $conf->console_addr($_[0]) }],
     );
 
 our $GLOBAL = AxKit2::Config::Global->new();
@@ -190,4 +191,4 @@
     return @vals;
 }
 
-1;
\ No newline at end of file
+1;

Modified: trunk/lib/AxKit2/Connection.pm
===================================================================
--- trunk/lib/AxKit2/Connection.pm	2006-08-13 02:18:36 UTC (rev 79)
+++ trunk/lib/AxKit2/Connection.pm	2006-08-13 02:19:05 UTC (rev 80)
@@ -53,6 +53,12 @@
     return $self;
 }
 
+sub uptime {
+    my AxKit2::Connection $self = shift;
+    
+    return (time() - $self->{create_time});
+}
+
 sub config {
     my AxKit2::Connection $self = shift;
     if ($self->{headers_in}) {
@@ -246,6 +252,9 @@
     return 1;
 }
 
+sub DESTROY {
+#    print "Connection DESTROY\n";
+}
 
 # Cleanup routine to get rid of timed out sockets
 sub _do_cleanup {
@@ -256,6 +265,8 @@
     Danga::Socket->AddTimer(CLEANUP_TIME, \&_do_cleanup);
     
     my $sf = __PACKAGE__->get_sock_ref;
+    
+    my $conns = 0;
 
     my %max_age;  # classname -> max age (0 means forever)
     my %max_connect; # classname -> max connect time
@@ -264,26 +275,24 @@
         my AxKit2::Connection $v = $sf->{$k};
         my $ref = ref $v;
         next unless $v->isa('AxKit2::Connection');
+        $conns++;
         unless (defined $max_age{$ref}) {
             $max_age{$ref}      = $ref->max_idle_time || 0;
             $max_connect{$ref}  = $ref->max_connect_time || 0;
         }
-        AxKit2::Client->log(LOGDEBUG, "got a Connection. Max age: $max_age{$ref}, Max Connect: $max_connect{$ref}");
         if (my $t = $max_connect{$ref}) {
-            AxKit2::Client->log(LOGDEBUG, "connection time: $v->{create_time} < " . ($now - $t));
             if ($v->{create_time} < $now - $t) {
                 push @to_close, $v;
                 next;
             }
         }
         if (my $t = $max_age{$ref}) {
-            AxKit2::Client->log(LOGDEBUG, "alive time: $v->{alive_time} < " . ($now - $t));
             if ($v->{alive_time} < $now - $t) {
                 push @to_close, $v;
             }
         }
     }
-
+    
     $_->close("Timeout") foreach @to_close;
 }
 

Added: trunk/lib/AxKit2/Console.pm
===================================================================
--- trunk/lib/AxKit2/Console.pm	2006-08-13 02:18:36 UTC (rev 79)
+++ trunk/lib/AxKit2/Console.pm	2006-08-13 02:19:05 UTC (rev 80)
@@ -0,0 +1,314 @@
+package AxKit2::Console;
+
+use strict;
+use warnings;
+
+use IO::Socket;
+use AxKit2::Constants;
+use Socket qw(IPPROTO_TCP TCP_NODELAY);
+
+use base 'Danga::Socket';
+
+use fields qw(
+    alive_time
+    create_time
+    line
+    );
+    
+use constant CLEANUP_TIME => 5; # seconds
+
+our $PROMPT = "Enter command (or \"HELP\" for help)\n";
+
+Danga::Socket->AddTimer(CLEANUP_TIME, \&_do_cleanup);
+
+sub create {
+    my $class    = shift;
+    my $config = shift;
+    
+    my $PORT = $config->console_port;
+    
+    return unless $PORT;
+    
+    my $sock = IO::Socket::INET->new(
+            LocalAddr => $config->console_addr || '127.0.0.1',
+            LocalPort => $PORT,
+            Proto     => 'tcp',
+            Type      => SOCK_STREAM,
+            Blocking  => 0,
+            Reuse     => 1,
+            Listen    => SOMAXCONN )
+               or die "Error creating server on port $PORT : $@\n";
+
+    IO::Handle::blocking($sock, 0);
+    
+    my $accept_handler = sub {
+        my $csock = $sock->accept;
+        return unless $csock;
+
+        if ($::DEBUG) {
+            AxKit2::Client->log(LOGDEBUG, "Listen child making a AxKit2::Connection for ", fileno($csock));
+        }
+
+        IO::Handle::blocking($csock, 0);
+        setsockopt($csock, IPPROTO_TCP, TCP_NODELAY, pack("l", 1)) or die;
+
+        if (my $client = eval { AxKit2::Console->new($csock, $config) }) {
+            $client->watch_read(1);
+            return;
+        } else {
+            die("Error creating new Console: $@") if $@;
+        }
+    };
+
+    Danga::Socket->AddOtherFds(fileno($sock) => $accept_handler);
+}
+
+sub max_idle_time       { 30 }
+sub max_connect_time    { 180 }
+sub event_err { my AxKit2::Connection $self = shift; $self->close("Error") }
+sub event_hup { my AxKit2::Connection $self = shift; $self->close("Disconnect (HUP)") }
+
+sub new {
+    my $self = shift;
+    my $sock = shift;
+    my $conf = shift;
+    $self = fields::new($self) unless ref($self);
+
+    $self->SUPER::new($sock);
+
+    my $now = time;
+    $self->{alive_time} = $self->{create_time} = $now;
+    $self->{line} = '';
+    
+    $self->write($PROMPT);
+    
+    return $self;
+}
+
+sub event_read {
+    my AxKit2::Console $self = shift;
+    $self->{alive_time} = time;
+
+    my $bref = $self->read(8192);
+    return $self->close($!) unless defined $bref;
+    $self->process_read_buf($bref);
+}
+
+sub process_read_buf {
+    my AxKit2::Console $self = shift;
+    my $bref = shift;
+    $self->{line} .= $$bref;
+    
+    while ($self->{line} =~ s/^(.*?\n)//) {
+        my $line = $1;
+        $self->process_line($line);
+    }
+}
+
+sub process_line {
+    my AxKit2::Console $self = shift;
+    my $line = shift;
+    
+    $line =~ s/\r?\n//;
+    my ($cmd, @params) = split(/ +/, $line);
+    my $meth = "cmd_" . lc($cmd);
+    if (my $lookup = $self->can($meth)) {
+        $lookup->($self, @params);
+        $self->write($PROMPT);
+    }
+    else {
+        # No such method - i.e. unrecognized command
+        return $self->write("command '$cmd' unrecognised\n");
+    }
+}
+
+my %helptext;
+
+$helptext{help} = "HELP [CMD] - Get help on all commands or a specific command";
+
+sub cmd_help {
+    my $self = shift;
+    my ($subcmd) = @_;
+    
+    $subcmd ||= 'help';
+    $subcmd = lc($subcmd);
+    
+    if ($subcmd eq 'help') {
+        my $txt = join("\n", map { substr($_, 0, index($_, "-")) } sort values(%helptext));
+        $self->write("Available Commands:\n\n$txt\n");
+    }
+    my $txt = $helptext{$subcmd} || "Unrecognised help option. Try 'help' for a full list.";
+    $self->write("$txt\n");
+}
+
+$helptext{quit} = "QUIT - Exit the console";
+sub cmd_quit {
+    my $self = shift;
+    $self->close;
+}
+
+$helptext{list} = "LIST [LIMIT] - List current connections, specify limit or negative limit to shrink list";
+sub cmd_list {
+    my $self = shift;
+    my ($count) = @_;
+    
+    my $descriptors = Danga::Socket->DescriptorMap;
+    
+    my $list = "Current" . ($count ? (($count > 0) ? " Oldest $count" : " Newest ".-$count) : "") . " Connections: \n\n";
+    my @all;
+    foreach my $fd (keys %$descriptors) {
+        my $pob = $descriptors->{$fd};
+        if ($pob->isa("AxKit2::Connection")) {
+            next unless $pob->peer_addr_string; # haven't even started yet
+            push @all, [$pob+0, $pob->peer_addr_string, $pob->uptime];
+        }
+    }
+    
+    @all = sort { $a->[2] <=> $b->[2] } @all;
+    if ($count) {
+        if ($count > 0) {
+            @all = @all[$#all-($count-1) .. $#all];
+        }
+        else {
+            @all = @all[0..(abs($count) - 1)];
+        }
+    }
+    foreach my $item (@all) {
+        $list .= sprintf("%x : %s [%s] Connected %0.2fs\n", map { defined()?$_:'' } @$item);
+    }
+    
+    $self->write( $list );
+}
+
+$helptext{kill} = "KILL (\$IP | \$REF) - Disconnect all connections from \$IP or connection reference \$REF";
+sub cmd_kill {
+    my $self = shift;
+    my ($match) = @_;
+    
+    return $self->write("SYNTAX: KILL (\$IP | \$REF)\n") unless $match;
+    
+    my $descriptors = Danga::Socket->DescriptorMap;
+    
+    my $killed = 0;
+    my $is_ip = (index($match, '.') >= 0);
+    foreach my $fd (keys %$descriptors) {
+        my $pob = $descriptors->{$fd};
+        if ($pob->isa("Qpsmtpd::PollServer")) {
+            if ($is_ip) {
+                next unless $pob->connection->remote_ip; # haven't even started yet
+                if ($pob->connection->remote_ip eq $match) {
+                    $pob->write("550 Your connection has been killed by an administrator\r\n");
+                    $pob->disconnect;
+                    $killed++;
+                }
+            }
+            else {
+                # match by ID
+                if ($pob+0 == hex($match)) {
+                    $pob->write("550 Your connection has been killed by an administrator\r\n");
+                    $pob->disconnect;
+                    $killed++;
+                }
+            }
+        }
+    }
+    
+    $self->write("Killed $killed connection" . ($killed > 1 ? "s" : "") . "\n");
+}
+
+$helptext{dump} = "DUMP \$REF - Dump a connection using Data::Dumper";
+sub cmd_dump {
+    my $self = shift;
+    my ($ref) = @_;
+    
+    require Data::Dumper;
+    $Data::Dumper::Indent=1;
+    $Data::Dumper::Terse=1;
+    
+    my $descriptors = Danga::Socket->DescriptorMap;
+    foreach my $fd (keys %$descriptors) {
+        my $pob = $descriptors->{$fd};
+        if ($pob->isa("AxKit2::Connection")) {
+            if ($pob+0 == hex($ref)) {
+                return $self->write( Data::Dumper::Dumper($pob) );
+            }
+        }
+    }
+    
+    $self->write("Unable to find the connection: $ref. Try the LIST command\n");
+}
+
+sub DBI::FIRSTKEY {}
+
+$helptext{leaks} = "LEAKS [DUMP] - Run Devel::GC::Helper to list leaks with optional Dumper output";
+my %prev_leaks;
+sub cmd_leaks {
+    my $self = shift;
+    my $dump = shift || '';
+    $dump = (uc($dump) eq 'DUMP') ? 1 : 0;
+    
+    require Devel::GC::Helper;
+    if ($dump) {
+        require Data::Dumper;
+        $Data::Dumper::Terse = 1;
+        $Data::Dumper::Indent = 1;
+        #$Data::Dumper::Deparse = 1;
+    }
+    
+    my $pid = fork;
+    die "Can't fork" unless defined $pid;
+    return if $pid;
+
+    # Child - run the leak sweep...
+    my $leaks = Devel::GC::Helper::sweep();
+    foreach my $leak (@$leaks) {
+        $self->write("Leaked $leak\n");
+        $self->write( Data::Dumper::Dumper($leak) ) if $dump;
+    }
+    $self->write( "Total leaks: " . scalar(@$leaks) . "\n");
+    $self->write($PROMPT);
+    
+    exit;
+}
+
+# Cleanup routine to get rid of timed out sockets
+sub _do_cleanup {
+    my $now = time;
+    
+    # AxKit2::Client->log(LOGDEBUG, "do cleanup");
+    
+    Danga::Socket->AddTimer(CLEANUP_TIME, \&_do_cleanup);
+    
+    my $sf = __PACKAGE__->get_sock_ref;
+    
+    my $conns = 0;
+
+    my %max_age;  # classname -> max age (0 means forever)
+    my %max_connect; # classname -> max connect time
+    my @to_close;
+    while (my $k = each %$sf) {
+        my AxKit2::Connection $v = $sf->{$k};
+        my $ref = ref $v;
+        next unless $v->isa('AxKit2::Console');
+        $conns++;
+        unless (defined $max_age{$ref}) {
+            $max_age{$ref}      = $ref->max_idle_time || 0;
+            $max_connect{$ref}  = $ref->max_connect_time || 0;
+        }
+        if (my $t = $max_connect{$ref}) {
+            if ($v->{create_time} < $now - $t) {
+                push @to_close, $v;
+                next;
+            }
+        }
+        if (my $t = $max_age{$ref}) {
+            if ($v->{alive_time} < $now - $t) {
+                push @to_close, $v;
+            }
+        }
+    }
+    
+    $_->close("Timeout") foreach @to_close;
+}
+
+1;

Modified: trunk/lib/AxKit2/Processor.pm
===================================================================
--- trunk/lib/AxKit2/Processor.pm	2006-08-13 02:18:36 UTC (rev 79)
+++ trunk/lib/AxKit2/Processor.pm	2006-08-13 02:19:05 UTC (rev 80)
@@ -106,6 +106,7 @@
             print $fh ($dom || $self->dom)->toString;
         }
         ($dom, $outfunc) = $trans->transform($pos++, $self);
+        # $trans->client(undef);
         $self->dom($dom);
     }
     

Modified: trunk/lib/AxKit2/Server.pm
===================================================================
--- trunk/lib/AxKit2/Server.pm	2006-08-13 02:18:36 UTC (rev 79)
+++ trunk/lib/AxKit2/Server.pm	2006-08-13 02:19:05 UTC (rev 80)
@@ -9,15 +9,13 @@
 use AxKit2::Constants;
 use AxKit2::Client;
 
-our @servers;
-
 sub create {
     my $class    = shift;
     my $servconf = shift;
     
     my $PORT = $servconf->port;
     my $sock = IO::Socket::INET->new(
-            LocalPort => $servconf->port,
+            LocalPort => $PORT,
             Proto     => 'tcp',
             Type      => SOCK_STREAM,
             Blocking  => 0,
@@ -27,8 +25,6 @@
 
     IO::Handle::blocking($sock, 0);
     
-    push @servers, $sock;
-    
     my $accept_handler = sub {
         my $csock = $sock->accept;
         return unless $csock;
@@ -51,4 +47,4 @@
     Danga::Socket->AddOtherFds(fileno($sock) => $accept_handler);
 }
 
-1;
\ No newline at end of file
+1;

Modified: trunk/lib/AxKit2.pm
===================================================================
--- trunk/lib/AxKit2.pm	2006-08-13 02:18:36 UTC (rev 79)
+++ trunk/lib/AxKit2.pm	2006-08-13 02:19:05 UTC (rev 80)
@@ -5,6 +5,7 @@
 use AxKit2::Client;
 use AxKit2::Server;
 use AxKit2::Config;
+use AxKit2::Console;
 
 our $VERSION = '1.0';
 
@@ -16,6 +17,9 @@
     
     local $SIG{'PIPE'} = "IGNORE";  # handled manually
     
+    # config server
+    AxKit2::Console->create(AxKit2::Config->global);
+    
     # setup server
     for my $server ($config->servers) {
         AxKit2::Server->create($server);
@@ -24,4 +28,54 @@
     Danga::Socket->EventLoop();
 }
 
-1;
\ No newline at end of file
+1;
+
+=head1 NAME
+
+AxKit2 - XML Application Server
+
+=head1 SYNOPSIS
+
+Just start the server:
+
+  ./axkit
+
+To do anything complex read the documentation and start writing plugins.
+
+=head1 DESCRIPTION
+
+AxKit2 is the second generation XML Application Server following in the
+footsteps of AxKit-1 (ONE). AxKit makes content generation easy by providing
+powerful tools to push XML through stylesheets. This helps ensure your web
+applications don't suffer from XSS bugs, and provides standardised templating
+tools so that your template authors don't need to learn a new Perl templating
+tool.
+
+=head1 PLUGINS
+
+Everything AxKit2 does is controlled by a plugin, and thus a lot of the
+documentation for things that AxKit2 does is held within the plugin itself.
+
+=head1 Why 2.0?
+
+In creating AxKit2 the following goals were aimed for:
+
+=over 4
+
+=item * Make it easier to setup and get started with than before.
+
+=item * Make it faster.
+
+=item * Make building complex web applications easier.
+
+=item * Make easy to extend and hack on.
+
+=item * Make complex pipelines and caching schemes easier.
+
+=back
+
+Many people wanted a straight port to Apache2/mod_perl2, so that they could
+get their AxKit code migrated off the Apache1.x platform. This would have been
+one route to go down, a route which we looked at very seriously. However it is
+not the path we chose for a number of reasons which you can find in the AxKit
+mailing list archives.