You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucy.apache.org by ka...@apache.org on 2012/01/03 03:33:55 UTC

[lucy-commits] svn commit: r1226609 - in /incubator/lucy/branches/0.3/perl: lib/LucyX/Remote/SearchServer.pm t/510-remote_search.t t/550-cluster_searcher.t

Author: karpet
Date: Tue Jan  3 02:33:54 2012
New Revision: 1226609

URL: http://svn.apache.org/viewvc?rev=1226609&view=rev
Log:
merge r1226462 and r1226463 (LUCY-205)

Modified:
    incubator/lucy/branches/0.3/perl/lib/LucyX/Remote/SearchServer.pm
    incubator/lucy/branches/0.3/perl/t/510-remote_search.t
    incubator/lucy/branches/0.3/perl/t/550-cluster_searcher.t

Modified: incubator/lucy/branches/0.3/perl/lib/LucyX/Remote/SearchServer.pm
URL: http://svn.apache.org/viewvc/incubator/lucy/branches/0.3/perl/lib/LucyX/Remote/SearchServer.pm?rev=1226609&r1=1226608&r2=1226609&view=diff
==============================================================================
--- incubator/lucy/branches/0.3/perl/lib/LucyX/Remote/SearchServer.pm (original)
+++ incubator/lucy/branches/0.3/perl/lib/LucyX/Remote/SearchServer.pm Tue Jan  3 02:33:54 2012
@@ -24,9 +24,7 @@ use Scalar::Util qw( reftype );
 
 # Inside-out member vars.
 our %searcher;
-our %port;
 our %password;
-our %sock;
 
 use IO::Socket::INET;
 use IO::Select;
@@ -35,33 +33,18 @@ sub new {
     my ( $either, %args ) = @_;
     my $searcher = delete $args{searcher};
     my $password = delete $args{password};
-    my $port     = delete $args{port};
     my $self     = $either->SUPER::new(%args);
     $searcher{$$self} = $searcher;
     confess("Missing required param 'password'") unless defined $password;
     $password{$$self} = $password;
 
-    # Establish a listening socket.
-    $port{$$self} = $port;
-    confess("Invalid port: $port") unless $port =~ /^\d+$/;
-    my $sock = IO::Socket::INET->new(
-        LocalPort => $port,
-        Proto     => 'tcp',
-        Listen    => SOMAXCONN,
-        Reuse     => 1,
-    );
-    confess("No socket: $!") unless $sock;
-    $sock{$$self} = $sock;
-
     return $self;
 }
 
 sub DESTROY {
     my $self = shift;
     delete $searcher{$$self};
-    delete $port{$$self};
     delete $password{$$self};
-    delete $sock{$$self};
     $self->SUPER::DESTROY;
 }
 
@@ -76,9 +59,18 @@ my %dispatch = (
 );
 
 sub serve {
-    my $self      = shift;
-    my $main_sock = $sock{$$self};
-    my $read_set  = IO::Select->new($main_sock);
+    my ( $self, %args ) = @_;
+    # Establish a listening socket.
+    my $port = delete $args{port};
+    confess("Invalid port: $port") unless $port =~ /^\d+$/;
+    my $main_sock = IO::Socket::INET->new(
+        LocalPort => $port,
+        Proto     => 'tcp',
+        Listen    => SOMAXCONN,
+        Reuse     => 1,
+    );
+    confess("No socket: $!") unless $main_sock;
+    my $read_set = IO::Select->new($main_sock);
 
     while ( my @ready = $read_set->can_read ) {
         for my $readhandle (@ready) {
@@ -90,41 +82,16 @@ sub serve {
             # Otherwise it's a client sock, so process the request.
             else {
                 my $client_sock = $readhandle;
-                my ( $check_val, $buf, $len );
-                $check_val = $client_sock->read( $buf, 4 );
-                if ( $check_val == 0 ) {
-                    # If read returns 0, socket has been closed cleanly at
-                    # the other end.
-                    $read_set->remove($client_sock);
-                    next;
-                }
-                confess $! unless $check_val == 4;
-                $len = unpack( 'N', $buf );
-                $check_val = $client_sock->read( $buf, $len );
-                confess $! unless $check_val == $len;
-                my $args = eval { thaw($buf) };
-                confess $@ if $@;
-                confess "Not a hashref" unless reftype($args) eq 'HASH';
-                my $method = delete $args->{_action};
+                my $status      = $self->serve_rpc($client_sock);
 
                 # If "done", the client's closing.
-                if ( $method eq 'done' ) {
+                if ( $status eq 'done' ) {
                     $read_set->remove($client_sock);
                     $client_sock->close;
                     next;
                 }
-
-                # Process the method call.
-                $dispatch{$method}
-                    or confess "ERROR: Bad method name: $method\n";
-                my $response   = $dispatch{$method}->( $self, $args );
-                my $frozen     = nfreeze($response);
-                my $packed_len = pack( 'N', length($frozen) );
-                print $client_sock "$packed_len$frozen"
-                    or confess $!;
-
                 # Remote signal to close the server.
-                if ( $method eq 'terminate' ) {
+                elsif ( $status eq 'terminate' ) {
                     my @all_handles = $read_set->handles;
                     $read_set->remove( \@all_handles );
                     $client_sock->close;
@@ -136,6 +103,40 @@ sub serve {
     }
 }
 
+sub serve_rpc {
+    my ( $self, $client_sock ) = @_;
+    my ( $check_val, $buf, $len );
+    $check_val = $client_sock->read( $buf, 4 );
+    # If read returns 0, socket has been closed cleanly at
+    # the other end.
+    return 'done' if $check_val == 0;
+    confess $! unless $check_val == 4;
+    $len = unpack( 'N', $buf );
+    $check_val = $client_sock->read( $buf, $len );
+    confess $! unless $check_val == $len;
+    my $args = eval { thaw($buf) };
+    confess $@ if $@;
+    confess "Not a hashref" unless reftype($args) eq 'HASH';
+    my $method = delete $args->{_action};
+
+    # If "done", the client's closing.
+    return $method if $method eq 'done';
+
+    # Process the method call.
+    $dispatch{$method}
+        or confess "ERROR: Bad method name: $method\n";
+    my $response   = $dispatch{$method}->( $self, $args );
+    my $frozen     = nfreeze($response);
+    my $packed_len = pack( 'N', length($frozen) );
+    print $client_sock "$packed_len$frozen"
+        or confess $!;
+
+    # Remote signal to close the server.
+    return $method if $method eq 'terminate';
+
+    return 'continue';
+}
+
 sub do_handshake {
     my ( $self, $args ) = @_;
     my $retval = 1;
@@ -196,10 +197,11 @@ LucyX::Remote::SearchServer - Make a Sea
         index => '/path/to/index' 
     );
     my $search_server = LucyX::Remote::SearchServer->new(
-        searcher => $searcher,
-        port     => 7890,
+        searcher => $searcher
+    );
+    $search_server->serve(
+        port => 7890
     );
-    $search_server->serve;
 
 =head1 DESCRIPTION 
 
@@ -218,7 +220,6 @@ distributed across multiple nodes, each 
 
     my $search_server = LucyX::Remote::SearchServer->new(
         searcher => $searcher, # required
-        port     => 7890,      # required
         password => $pass,     # optional
     );
 
@@ -233,10 +234,6 @@ will wrap.
 
 =item *
 
-B<port> - the port on localhost that the server should open and listen on.
-
-=item *
-
 B<password> - an optional password which, if supplied, must also be supplied
 by clients.
 
@@ -244,8 +241,26 @@ by clients.
 
 =head2 serve
 
-    $search_server->serve;
+    $search_server->serve(
+        port => 7890,      # required
+    );
 
 Open a listening socket on localhost and wait for SearchClients to connect.
 
+=over
+
+=item *
+
+B<port> - the port on localhost that the server should open and listen on.
+
+=back
+
+=head2 serve_rpc
+
+    my $status = $search_server->serve_rpc($sock);
+
+Handle a single RPC from socket $sock. Returns 'done' if the connection should
+be closed. Returns 'terminate' if the server should shut down. Returns
+'continue' if the server should continue to handle requests from this client.
+
 =cut

Modified: incubator/lucy/branches/0.3/perl/t/510-remote_search.t
URL: http://svn.apache.org/viewvc/incubator/lucy/branches/0.3/perl/t/510-remote_search.t?rev=1226609&r1=1226608&r2=1226609&view=diff
==============================================================================
--- incubator/lucy/branches/0.3/perl/t/510-remote_search.t (original)
+++ incubator/lucy/branches/0.3/perl/t/510-remote_search.t Tue Jan  3 02:33:54 2012
@@ -71,11 +71,10 @@ else {
 
     my $searcher = Lucy::Search::IndexSearcher->new( index => $folder );
     my $server = LucyX::Remote::SearchServer->new(
-        port     => $PORT_NUM,
         searcher => $searcher,
         password => 'foo',
     );
-    $server->serve;
+    $server->serve( port => $PORT_NUM );
     exit(0);
 }
 

Modified: incubator/lucy/branches/0.3/perl/t/550-cluster_searcher.t
URL: http://svn.apache.org/viewvc/incubator/lucy/branches/0.3/perl/t/550-cluster_searcher.t?rev=1226609&r1=1226608&r2=1226609&view=diff
==============================================================================
--- incubator/lucy/branches/0.3/perl/t/550-cluster_searcher.t (original)
+++ incubator/lucy/branches/0.3/perl/t/550-cluster_searcher.t Tue Jan  3 02:33:54 2012
@@ -72,7 +72,7 @@ for my $port (@ports) {
         for (qw( a b c )) {
             my %doc = (
                 content => "x $_ $port",
-                junk    => "xyz " x 4000, # should trigger partial reads
+                junk    => "xyz " x 4000,    # should trigger partial reads
                 number  => $number,
                 port    => $port,
             );
@@ -83,11 +83,10 @@ for my $port (@ports) {
 
         my $searcher = Lucy::Search::IndexSearcher->new( index => $folder );
         my $server = LucyX::Remote::SearchServer->new(
-            port     => $port,
             searcher => $searcher,
             password => 'foo',
         );
-        $server->serve;
+        $server->serve( port => $port );
         exit(0);
     }
 }