You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by cu...@apache.org on 2010/12/10 20:43:22 UTC

svn commit: r1044489 - in /avro/trunk: CHANGES.txt lang/java/src/java/org/apache/avro/ipc/SocketServer.java lang/java/src/java/org/apache/avro/ipc/SocketTransceiver.java

Author: cutting
Date: Fri Dec 10 19:43:21 2010
New Revision: 1044489

URL: http://svn.apache.org/viewvc?rev=1044489&view=rev
Log:
AVRO-704. Java: Fix SocketServer connection threads to exit rather than busywait when client closes connection.

Modified:
    avro/trunk/CHANGES.txt
    avro/trunk/lang/java/src/java/org/apache/avro/ipc/SocketServer.java
    avro/trunk/lang/java/src/java/org/apache/avro/ipc/SocketTransceiver.java

Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1044489&r1=1044488&r2=1044489&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Fri Dec 10 19:43:21 2010
@@ -64,6 +64,9 @@ Avro 1.5.0 (unreleased)
 
     AVRO-706. Java: Type promotion not succeeding for long -> float. (thiru)
 
+    AVRO-704. Java: Fix SocketServer connection threads to exit rather
+    than busywait when client closes connection. (cutting)
+
 Avro 1.4.1 (13 October 2010)
 
   NEW FEATURES

Modified: avro/trunk/lang/java/src/java/org/apache/avro/ipc/SocketServer.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/ipc/SocketServer.java?rev=1044489&r1=1044488&r2=1044489&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/ipc/SocketServer.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/ipc/SocketServer.java Fri Dec 10 19:43:21 2010
@@ -29,6 +29,10 @@ import java.nio.channels.SocketChannel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.avro.Protocol;
+import org.apache.avro.Protocol.Message;
+import org.apache.avro.generic.GenericResponder;
+
 /** A socket-based server implementation. This uses a simple, non-standard wire
  * protocol and is not intended for production services.
  * @deprecated use {@link SaslSocketServer} instead.
@@ -115,7 +119,7 @@ public class SocketServer extends Thread
         } catch (ClosedChannelException e) {
           return;
         } finally {
-          channel.close();
+          xc.close();
         }
       } catch (IOException e) {
         LOG.warn("unexpected error", e);
@@ -125,9 +129,16 @@ public class SocketServer extends Thread
   }
   
   public static void main(String[] arg) throws Exception {
-    SocketServer server = new SocketServer(null, new InetSocketAddress(0));
-    System.out.println("started");
+    Responder responder =
+      new GenericResponder(Protocol.parse("{\"protocol\": \"X\"}")) {
+        public Object respond(Message message, Object request)
+          throws Exception {
+          throw new IOException("no messages!");
+        }
+      };
+    SocketServer server = new SocketServer(responder, new InetSocketAddress(0));
+    server.start();
+    System.out.println("server started on port: "+server.getPort());
     server.join();
   }
 }
-

Modified: avro/trunk/lang/java/src/java/org/apache/avro/ipc/SocketTransceiver.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/src/java/org/apache/avro/ipc/SocketTransceiver.java?rev=1044489&r1=1044488&r2=1044489&view=diff
==============================================================================
--- avro/trunk/lang/java/src/java/org/apache/avro/ipc/SocketTransceiver.java (original)
+++ avro/trunk/lang/java/src/java/org/apache/avro/ipc/SocketTransceiver.java Fri Dec 10 19:43:21 2010
@@ -21,6 +21,7 @@ package org.apache.avro.ipc;
 import java.io.IOException;
 import java.net.SocketAddress;
 import java.nio.channels.SocketChannel;
+import java.nio.channels.ClosedChannelException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
@@ -62,7 +63,8 @@ public class SocketTransceiver extends T
     while (true) {
       header.clear();
       while (header.hasRemaining()) {
-        channel.read(header);
+        if (channel.read(header) < 0)
+          throw new ClosedChannelException();
       }
       header.flip();
       int length = header.getInt();
@@ -71,7 +73,8 @@ public class SocketTransceiver extends T
       }
       ByteBuffer buffer = ByteBuffer.allocate(length);
       while (buffer.hasRemaining()) {
-        channel.read(buffer);
+        if (channel.read(buffer) < 0)
+          throw new ClosedChannelException();
       }
       buffer.flip();
       buffers.add(buffer);