You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucy.apache.org by ka...@apache.org on 2012/01/03 03:33:55 UTC
[lucy-commits] svn commit: r1226609 - in /incubator/lucy/branches/0.3/perl:
lib/LucyX/Remote/SearchServer.pm t/510-remote_search.t
t/550-cluster_searcher.t
Author: karpet
Date: Tue Jan 3 02:33:54 2012
New Revision: 1226609
URL: http://svn.apache.org/viewvc?rev=1226609&view=rev
Log:
merge r1226462 and r1226463 (LUCY-205)
Modified:
incubator/lucy/branches/0.3/perl/lib/LucyX/Remote/SearchServer.pm
incubator/lucy/branches/0.3/perl/t/510-remote_search.t
incubator/lucy/branches/0.3/perl/t/550-cluster_searcher.t
Modified: incubator/lucy/branches/0.3/perl/lib/LucyX/Remote/SearchServer.pm
URL: http://svn.apache.org/viewvc/incubator/lucy/branches/0.3/perl/lib/LucyX/Remote/SearchServer.pm?rev=1226609&r1=1226608&r2=1226609&view=diff
==============================================================================
--- incubator/lucy/branches/0.3/perl/lib/LucyX/Remote/SearchServer.pm (original)
+++ incubator/lucy/branches/0.3/perl/lib/LucyX/Remote/SearchServer.pm Tue Jan 3 02:33:54 2012
@@ -24,9 +24,7 @@ use Scalar::Util qw( reftype );
# Inside-out member vars.
our %searcher;
-our %port;
our %password;
-our %sock;
use IO::Socket::INET;
use IO::Select;
@@ -35,33 +33,18 @@ sub new {
my ( $either, %args ) = @_;
my $searcher = delete $args{searcher};
my $password = delete $args{password};
- my $port = delete $args{port};
my $self = $either->SUPER::new(%args);
$searcher{$$self} = $searcher;
confess("Missing required param 'password'") unless defined $password;
$password{$$self} = $password;
- # Establish a listening socket.
- $port{$$self} = $port;
- confess("Invalid port: $port") unless $port =~ /^\d+$/;
- my $sock = IO::Socket::INET->new(
- LocalPort => $port,
- Proto => 'tcp',
- Listen => SOMAXCONN,
- Reuse => 1,
- );
- confess("No socket: $!") unless $sock;
- $sock{$$self} = $sock;
-
return $self;
}
sub DESTROY {
my $self = shift;
delete $searcher{$$self};
- delete $port{$$self};
delete $password{$$self};
- delete $sock{$$self};
$self->SUPER::DESTROY;
}
@@ -76,9 +59,18 @@ my %dispatch = (
);
sub serve {
- my $self = shift;
- my $main_sock = $sock{$$self};
- my $read_set = IO::Select->new($main_sock);
+ my ( $self, %args ) = @_;
+ # Establish a listening socket.
+ my $port = delete $args{port};
+ confess("Invalid port: $port") unless $port =~ /^\d+$/;
+ my $main_sock = IO::Socket::INET->new(
+ LocalPort => $port,
+ Proto => 'tcp',
+ Listen => SOMAXCONN,
+ Reuse => 1,
+ );
+ confess("No socket: $!") unless $main_sock;
+ my $read_set = IO::Select->new($main_sock);
while ( my @ready = $read_set->can_read ) {
for my $readhandle (@ready) {
@@ -90,41 +82,16 @@ sub serve {
# Otherwise it's a client sock, so process the request.
else {
my $client_sock = $readhandle;
- my ( $check_val, $buf, $len );
- $check_val = $client_sock->read( $buf, 4 );
- if ( $check_val == 0 ) {
- # If read returns 0, socket has been closed cleanly at
- # the other end.
- $read_set->remove($client_sock);
- next;
- }
- confess $! unless $check_val == 4;
- $len = unpack( 'N', $buf );
- $check_val = $client_sock->read( $buf, $len );
- confess $! unless $check_val == $len;
- my $args = eval { thaw($buf) };
- confess $@ if $@;
- confess "Not a hashref" unless reftype($args) eq 'HASH';
- my $method = delete $args->{_action};
+ my $status = $self->serve_rpc($client_sock);
# If "done", the client's closing.
- if ( $method eq 'done' ) {
+ if ( $status eq 'done' ) {
$read_set->remove($client_sock);
$client_sock->close;
next;
}
-
- # Process the method call.
- $dispatch{$method}
- or confess "ERROR: Bad method name: $method\n";
- my $response = $dispatch{$method}->( $self, $args );
- my $frozen = nfreeze($response);
- my $packed_len = pack( 'N', length($frozen) );
- print $client_sock "$packed_len$frozen"
- or confess $!;
-
# Remote signal to close the server.
- if ( $method eq 'terminate' ) {
+ elsif ( $status eq 'terminate' ) {
my @all_handles = $read_set->handles;
$read_set->remove( \@all_handles );
$client_sock->close;
@@ -136,6 +103,40 @@ sub serve {
}
}
+sub serve_rpc {
+ my ( $self, $client_sock ) = @_;
+ my ( $check_val, $buf, $len );
+ $check_val = $client_sock->read( $buf, 4 );
+ # If read returns 0, socket has been closed cleanly at
+ # the other end.
+ return 'done' if $check_val == 0;
+ confess $! unless $check_val == 4;
+ $len = unpack( 'N', $buf );
+ $check_val = $client_sock->read( $buf, $len );
+ confess $! unless $check_val == $len;
+ my $args = eval { thaw($buf) };
+ confess $@ if $@;
+ confess "Not a hashref" unless reftype($args) eq 'HASH';
+ my $method = delete $args->{_action};
+
+ # If "done", the client's closing.
+ return $method if $method eq 'done';
+
+ # Process the method call.
+ $dispatch{$method}
+ or confess "ERROR: Bad method name: $method\n";
+ my $response = $dispatch{$method}->( $self, $args );
+ my $frozen = nfreeze($response);
+ my $packed_len = pack( 'N', length($frozen) );
+ print $client_sock "$packed_len$frozen"
+ or confess $!;
+
+ # Remote signal to close the server.
+ return $method if $method eq 'terminate';
+
+ return 'continue';
+}
+
sub do_handshake {
my ( $self, $args ) = @_;
my $retval = 1;
@@ -196,10 +197,11 @@ LucyX::Remote::SearchServer - Make a Sea
index => '/path/to/index'
);
my $search_server = LucyX::Remote::SearchServer->new(
- searcher => $searcher,
- port => 7890,
+ searcher => $searcher
+ );
+ $search_server->serve(
+ port => 7890
);
- $search_server->serve;
=head1 DESCRIPTION
@@ -218,7 +220,6 @@ distributed across multiple nodes, each
my $search_server = LucyX::Remote::SearchServer->new(
searcher => $searcher, # required
- port => 7890, # required
password => $pass, # optional
);
@@ -233,10 +234,6 @@ will wrap.
=item *
-B<port> - the port on localhost that the server should open and listen on.
-
-=item *
-
B<password> - an optional password which, if supplied, must also be supplied
by clients.
@@ -244,8 +241,26 @@ by clients.
=head2 serve
- $search_server->serve;
+ $search_server->serve(
+ port => 7890, # required
+ );
Open a listening socket on localhost and wait for SearchClients to connect.
+=over
+
+=item *
+
+B<port> - the port on localhost that the server should open and listen on.
+
+=back
+
+=head2 serve_rpc
+
+ my $status = $search_server->serve_rpc($sock);
+
+Handle a single RPC from socket $sock. Returns 'done' if the connection should
+be closed. Returns 'terminate' if the server should shut down. Returns
+'continue' if the server should continue to handle requests from this client.
+
=cut
Modified: incubator/lucy/branches/0.3/perl/t/510-remote_search.t
URL: http://svn.apache.org/viewvc/incubator/lucy/branches/0.3/perl/t/510-remote_search.t?rev=1226609&r1=1226608&r2=1226609&view=diff
==============================================================================
--- incubator/lucy/branches/0.3/perl/t/510-remote_search.t (original)
+++ incubator/lucy/branches/0.3/perl/t/510-remote_search.t Tue Jan 3 02:33:54 2012
@@ -71,11 +71,10 @@ else {
my $searcher = Lucy::Search::IndexSearcher->new( index => $folder );
my $server = LucyX::Remote::SearchServer->new(
- port => $PORT_NUM,
searcher => $searcher,
password => 'foo',
);
- $server->serve;
+ $server->serve( port => $PORT_NUM );
exit(0);
}
Modified: incubator/lucy/branches/0.3/perl/t/550-cluster_searcher.t
URL: http://svn.apache.org/viewvc/incubator/lucy/branches/0.3/perl/t/550-cluster_searcher.t?rev=1226609&r1=1226608&r2=1226609&view=diff
==============================================================================
--- incubator/lucy/branches/0.3/perl/t/550-cluster_searcher.t (original)
+++ incubator/lucy/branches/0.3/perl/t/550-cluster_searcher.t Tue Jan 3 02:33:54 2012
@@ -72,7 +72,7 @@ for my $port (@ports) {
for (qw( a b c )) {
my %doc = (
content => "x $_ $port",
- junk => "xyz " x 4000, # should trigger partial reads
+ junk => "xyz " x 4000, # should trigger partial reads
number => $number,
port => $port,
);
@@ -83,11 +83,10 @@ for my $port (@ports) {
my $searcher = Lucy::Search::IndexSearcher->new( index => $folder );
my $server = LucyX::Remote::SearchServer->new(
- port => $port,
searcher => $searcher,
password => 'foo',
);
- $server->serve;
+ $server->serve( port => $port );
exit(0);
}
}