You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by cu...@apache.org on 2010/12/10 20:43:22 UTC
svn commit: r1044489 - in /avro/trunk: CHANGES.txt
lang/java/src/java/org/apache/avro/ipc/SocketServer.java
lang/java/src/java/org/apache/avro/ipc/SocketTransceiver.java
Author: cutting
Date: Fri Dec 10 19:43:21 2010
New Revision: 1044489
URL: http://svn.apache.org/viewvc?rev=1044489&view=rev
Log:
AVRO-704. Java: Fix SocketServer connection threads to exit rather than busywait when client closes connection.
Modified:
avro/trunk/CHANGES.txt
avro/trunk/lang/java/src/java/org/apache/avro/ipc/SocketServer.java
avro/trunk/lang/java/src/java/org/apache/avro/ipc/SocketTransceiver.java
Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1044489&r1=1044488&r2=1044489&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Fri Dec 10 19:43:21 2010
@@ -64,6 +64,9 @@ Avro 1.5.0 (unreleased)
AVRO-706. Java: Type promotion not succeeding for long -> float. (thiru)
+ AVRO-704. Java: Fix SocketServer connection threads to exit rather
+ than busywait when client closes connection. (cutting)
+
Avro 1.4.1 (13 October 2010)
NEW FEATURES
Modified: avro/trunk/lang/java/src/java/org/apache/avro/ipc/SocketServer.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/ipc/SocketServer.java?rev=1044489&r1=1044488&r2=1044489&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/ipc/SocketServer.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/ipc/SocketServer.java Fri Dec 10 19:43:21 2010
@@ -29,6 +29,10 @@ import java.nio.channels.SocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.avro.Protocol;
+import org.apache.avro.Protocol.Message;
+import org.apache.avro.generic.GenericResponder;
+
/** A socket-based server implementation. This uses a simple, non-standard wire
* protocol and is not intended for production services.
* @deprecated use {@link SaslSocketServer} instead.
@@ -115,7 +119,7 @@ public class SocketServer extends Thread
} catch (ClosedChannelException e) {
return;
} finally {
- channel.close();
+ xc.close();
}
} catch (IOException e) {
LOG.warn("unexpected error", e);
@@ -125,9 +129,16 @@ public class SocketServer extends Thread
}
public static void main(String[] arg) throws Exception {
- SocketServer server = new SocketServer(null, new InetSocketAddress(0));
- System.out.println("started");
+ Responder responder =
+ new GenericResponder(Protocol.parse("{\"protocol\": \"X\"}")) {
+ public Object respond(Message message, Object request)
+ throws Exception {
+ throw new IOException("no messages!");
+ }
+ };
+ SocketServer server = new SocketServer(responder, new InetSocketAddress(0));
+ server.start();
+ System.out.println("server started on port: "+server.getPort());
server.join();
}
}
-
Modified: avro/trunk/lang/java/src/java/org/apache/avro/ipc/SocketTransceiver.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/ipc/SocketTransceiver.java?rev=1044489&r1=1044488&r2=1044489&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/ipc/SocketTransceiver.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/ipc/SocketTransceiver.java Fri Dec 10 19:43:21 2010
@@ -21,6 +21,7 @@ package org.apache.avro.ipc;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.channels.SocketChannel;
+import java.nio.channels.ClosedChannelException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
@@ -62,7 +63,8 @@ public class SocketTransceiver extends T
while (true) {
header.clear();
while (header.hasRemaining()) {
- channel.read(header);
+ if (channel.read(header) < 0)
+ throw new ClosedChannelException();
}
header.flip();
int length = header.getInt();
@@ -71,7 +73,8 @@ public class SocketTransceiver extends T
}
ByteBuffer buffer = ByteBuffer.allocate(length);
while (buffer.hasRemaining()) {
- channel.read(buffer);
+ if (channel.read(buffer) < 0)
+ throw new ClosedChannelException();
}
buffer.flip();
buffers.add(buffer);