You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by cu...@apache.org on 2011/06/17 19:55:22 UTC

svn commit: r1136961 - in /avro/branches/branch-1.5: ./ lang/java/compiler/src/main/velocity/org/apache/avro/compiler/specific/templates/java/classic/ lang/java/ipc/src/main/java/org/apache/avro/ipc/ lang/java/ipc/src/main/java/org/apache/avro/ipc/gene...

Author: cutting
Date: Fri Jun 17 17:55:21 2011
New Revision: 1136961

URL: http://svn.apache.org/viewvc?rev=1136961&view=rev
Log:
Merge r1099257 and r1136342 from trunk.  Fixes: AVRO-815 and AVRO-539.

Added:
    avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/CallFuture.java
      - copied unchanged from r1136342, avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/CallFuture.java
    avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/Callback.java
      - copied unchanged from r1136342, avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/Callback.java
    avro/branches/branch-1.5/lang/java/ipc/src/test/java/org/apache/avro/TestProtocolNetty.java
      - copied unchanged from r1136342, avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/TestProtocolNetty.java
    avro/branches/branch-1.5/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithCallbacks.java
      - copied unchanged from r1136342, avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithCallbacks.java
Modified:
    avro/branches/branch-1.5/   (props changed)
    avro/branches/branch-1.5/CHANGES.txt
    avro/branches/branch-1.5/lang/java/compiler/src/main/velocity/org/apache/avro/compiler/specific/templates/java/classic/protocol.vm
    avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyServer.java
    avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java
    avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/Requestor.java
    avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/Responder.java
    avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/Transceiver.java
    avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/generic/GenericRequestor.java
    avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/specific/SpecificRequestor.java
    avro/branches/branch-1.5/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServer.java
    avro/branches/branch-1.5/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestRpcPluginOrdering.java
    avro/branches/branch-1.5/share/test/schemas/mail.avpr

Propchange: avro/branches/branch-1.5/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Jun 17 17:55:21 2011
@@ -1 +1 @@
-/avro/trunk:1075938,1075993,1078917,1079055,1079060,1079063,1083246,1085921,1086727,1086730,1086866,1087076,1087129,1087136,1087439-1087440,1087463,1087472,1087792,1089128,1089131,1089550,1094812,1095206-1095208,1095493,1095529,1095548,1095550,1096798,1097916,1097927,1097968,1097974,1102332,1102335,1124127,1124971,1129053,1129071,1129697-1129706,1129729,1130503
+/avro/trunk:1075938,1075993,1078917,1079055,1079060,1079063,1083246,1085921,1086727,1086730,1086866,1087076,1087129,1087136,1087439-1087440,1087463,1087472,1087792,1089128,1089131,1089550,1094812,1095206-1095208,1095493,1095529,1095548,1095550,1096798,1097916,1097927,1097968,1097974,1099257,1102332,1102335,1124127,1124971,1129053,1129071,1129697-1129706,1129729,1130503,1136342

Modified: avro/branches/branch-1.5/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/branches/branch-1.5/CHANGES.txt?rev=1136961&r1=1136960&r2=1136961&view=diff
==============================================================================
--- avro/branches/branch-1.5/CHANGES.txt (original)
+++ avro/branches/branch-1.5/CHANGES.txt Fri Jun 17 17:55:21 2011
@@ -8,6 +8,9 @@ Avro 1.5.2 (unreleased)
     streaming jobs to easily write Avro format output with "bytes" as
     schema.  (Tom White via cutting)
 
+    AVRO-539. Java: Add asynchronous RPC support, through either
+    callbacks or futures.  (James Baldassari via cutting)
+
   IMPROVEMENTS
 
     AVRO-469. C: Set library's libtool-style soversion when using CMake
@@ -34,6 +37,10 @@ Avro 1.5.2 (unreleased)
     AVRO-832. Java: Fix RPC client to correctly perform schema
     resolution on message responses.  (cutting)
 
+    AVRO-815. Java: Netty Transceiver fails processing one-way messages.
+    Implemented writeBuffers for the NettyTransceiver to allow it to
+    process one-way messages.
+
 Avro 1.5.1 (3 May 2011)
 
   NEW FEATURES

Modified: avro/branches/branch-1.5/lang/java/compiler/src/main/velocity/org/apache/avro/compiler/specific/templates/java/classic/protocol.vm
URL: http://svn.apache.org/viewvc/avro/branches/branch-1.5/lang/java/compiler/src/main/velocity/org/apache/avro/compiler/specific/templates/java/classic/protocol.vm?rev=1136961&r1=1136960&r2=1136961&view=diff
==============================================================================
--- avro/branches/branch-1.5/lang/java/compiler/src/main/velocity/org/apache/avro/compiler/specific/templates/java/classic/protocol.vm (original)
+++ avro/branches/branch-1.5/lang/java/compiler/src/main/velocity/org/apache/avro/compiler/specific/templates/java/classic/protocol.vm Fri Jun 17 17:55:21 2011
@@ -21,7 +21,7 @@ package $protocol.getNamespace();
 
 @SuppressWarnings("all")
 #if ($protocol.getDoc())
-  /** $protocol.getDoc() */
+/** $protocol.getDoc() */
 #end
 public interface $this.mangle($protocol.getName()) {
   public static final org.apache.avro.Protocol PROTOCOL = org.apache.avro.Protocol.parse("${this.javaEscape($protocol.toString())}");
@@ -46,4 +46,31 @@ public interface $this.mangle($protocol.
 #end##    (one way)
 ;
 #end## (requests)
-}
+
+## Generate nested callback API
+  @SuppressWarnings("all")
+#if ($protocol.getDoc())
+  /** $protocol.getDoc() */
+#end
+  public interface Callback extends $this.mangle($protocol.getName()) {
+    public static final org.apache.avro.Protocol PROTOCOL = #if ($protocol.getNamespace())$protocol.getNamespace().#end${this.mangle($protocol.getName())}.PROTOCOL;
+#foreach ($e in $protocol.getMessages().entrySet())
+#set ($name = $e.getKey())
+#set ($message = $e.getValue())
+#set ($response = $message.getResponse())
+## Generate callback method if the message is not one-way:
+#if (! $message.isOneWay())
+#if ($message.getDoc())
+    /** $this.escapeForJavadoc($message.getDoc()) */
+#end
+    void ${this.mangle($name)}(##
+#foreach ($p in $message.getRequest().getFields())##
+#*      *#${this.javaUnbox($p.schema())} ${this.mangle($p.name())}#if ($velocityHasNext), #end
+#end
+#if ($message.getRequest().getFields().size() > 0), #end
+org.apache.avro.ipc.Callback<${this.javaType($response)}> callback) throws java.io.IOException;
+#end## (generate callback method)
+#end## (requests)
+  }## End of Callback interface
+
+}## End of protocol interface

Modified: avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyServer.java
URL: http://svn.apache.org/viewvc/avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyServer.java?rev=1136961&r1=1136960&r2=1136961&view=diff
==============================================================================
--- avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyServer.java (original)
+++ avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyServer.java Fri Jun 17 17:55:21 2011
@@ -54,13 +54,13 @@ public class NettyServer implements Serv
   private static final Logger LOG = LoggerFactory.getLogger(NettyServer.class
       .getName());
 
-  private Responder responder;
+  private final Responder responder;
 
-  private Channel serverChannel;
-  private ChannelGroup allChannels = new DefaultChannelGroup(
+  private final Channel serverChannel;
+  private final ChannelGroup allChannels = new DefaultChannelGroup(
       "avro-netty-server");
-  private ChannelFactory channelFactory;
-  private CountDownLatch closed = new CountDownLatch(1);
+  private final ChannelFactory channelFactory;
+  private final CountDownLatch closed = new CountDownLatch(1);
   
   public NettyServer(Responder responder, InetSocketAddress addr) {
     this.responder = responder;
@@ -133,14 +133,13 @@ public class NettyServer implements Serv
         NettyDataPack dataPack = (NettyDataPack) e.getMessage();
         List<ByteBuffer> req = dataPack.getDatas();
         List<ByteBuffer> res = responder.respond(req, connectionMetadata);
-        dataPack.setDatas(res);
-        e.getChannel().write(dataPack);
+                // response will be null for oneway messages.
+        if(res != null) {
+          dataPack.setDatas(res);
+          e.getChannel().write(dataPack);          
+        }
       } catch (IOException ex) {
         LOG.warn("unexpect error");
-      } finally {
-        if(!connectionMetadata.isConnected()){
-          e.getChannel().close();
-        }
       }
     }
 

Modified: avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java
URL: http://svn.apache.org/viewvc/avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java?rev=1136961&r1=1136960&r2=1136961&view=diff
==============================================================================
--- avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java (original)
+++ avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java Fri Jun 17 17:55:21 2011
@@ -27,10 +27,8 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.avro.Protocol;
@@ -45,6 +43,7 @@ import org.jboss.netty.channel.ChannelFu
 import org.jboss.netty.channel.ChannelHandlerContext;
 import org.jboss.netty.channel.ChannelPipeline;
 import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.ChannelState;
 import org.jboss.netty.channel.ChannelStateEvent;
 import org.jboss.netty.channel.Channels;
 import org.jboss.netty.channel.ExceptionEvent;
@@ -61,26 +60,39 @@ public class NettyTransceiver extends Tr
   private static final Logger LOG = LoggerFactory.getLogger(NettyTransceiver.class
       .getName());
 
-  private ChannelFactory channelFactory;
-  private Channel channel;
+  private final AtomicInteger serialGenerator = new AtomicInteger(0);
+  private final Map<Integer, Callback<List<ByteBuffer>>> requests = 
+    new ConcurrentHashMap<Integer, Callback<List<ByteBuffer>>>();
   
-  private AtomicInteger serialGenerator = new AtomicInteger(0);
-  private Map<Integer, CallFuture> requests = 
-    new ConcurrentHashMap<Integer, CallFuture>();
+  private final ChannelFactory channelFactory;
+  private final ClientBootstrap bootstrap;
+  private final InetSocketAddress remoteAddr;
   
-  private Protocol remote;
+  /**
+   * Read lock must be acquired whenever using non-final state.
+   * Write lock must be acquired whenever modifying state.
+   */
+  private final ReadWriteLock stateLock = new ReentrantReadWriteLock();
+  private boolean open = false;  // Synchronized on stateLock
+  private Channel channel;       // Synchronized on stateLock
+  private Protocol remote;       // Synchronized on stateLock
+
+  NettyTransceiver() {
+    channelFactory = null;
+    bootstrap = null;
+    remoteAddr = null;
+  }
 
-  NettyTransceiver() {}
-  
   public NettyTransceiver(InetSocketAddress addr) {
-    this(addr, new NioClientSocketChannelFactory(Executors.
-        newCachedThreadPool(), Executors.newCachedThreadPool()));
+    this(addr, new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), 
+        Executors.newCachedThreadPool()));
   }
 
   public NettyTransceiver(InetSocketAddress addr, ChannelFactory channelFactory) {
     // Set up.
     this.channelFactory = channelFactory;
-    ClientBootstrap bootstrap = new ClientBootstrap(channelFactory);
+    bootstrap = new ClientBootstrap(channelFactory);
+    remoteAddr = addr;
 
     // Configure the event pipeline factory.
     bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@@ -97,54 +109,160 @@ public class NettyTransceiver extends Tr
     bootstrap.setOption("tcpNoDelay", true);
 
     // Make a new connection.
-    ChannelFuture channelFuture = bootstrap.connect(addr);
-    channelFuture.awaitUninterruptibly();
-    if (!channelFuture.isSuccess()) {
-      channelFuture.getCause().printStackTrace();
-      throw new RuntimeException(channelFuture.getCause());
+    connect();
+  }
+  
+  /**
+   * Connects to the remote peer if not already connected.
+   */
+  private void connect() {
+    stateLock.writeLock().lock();
+    try {
+      if (!open || (channel == null) || !channel.isOpen() || !channel.isBound() || !channel.isConnected()) {
+        LOG.info("Connecting to " + remoteAddr);
+        ChannelFuture channelFuture = bootstrap.connect(remoteAddr);
+        channelFuture.awaitUninterruptibly();
+        if (!channelFuture.isSuccess()) {
+          channelFuture.getCause().printStackTrace();
+          throw new RuntimeException(channelFuture.getCause());
+        }
+        channel = channelFuture.getChannel();
+        open = true;
+      }
+    } finally {
+      stateLock.writeLock().unlock();
+    }
+  }
+  
+  /**
+   * Closes the connection to the remote peer if connected.
+   */
+  private void disconnect() {
+    disconnect(false);
+  }
+  
+  /**
+   * Closes the connection to the remote peer if connected.
+   * @param awaitCompletion if true, will block until the close has completed.
+   */
+  private void disconnect(boolean awaitCompletion) {
+    stateLock.writeLock().lock();
+    try {
+      if (channel != null) {
+        LOG.info("Disconnecting from " + remoteAddr);
+        ChannelFuture closeFuture = channel.close();
+        if (awaitCompletion) {
+          closeFuture.awaitUninterruptibly();
+        }
+        channel = null;
+        remote = null;
+        open = false;
+      }
+    } finally {
+      stateLock.writeLock().unlock();
     }
-    channel = channelFuture.getChannel();
+  }
+  
+  /**
+   * Netty channels are thread-safe, so there is no need to acquire locks.
+   * This method is a no-op.
+   */
+  @Override
+  public void lockChannel() {
+    
+  }
+  
+  /**
+   * Netty channels are thread-safe, so there is no need to acquire locks.
+   * This method is a no-op.
+   */
+  @Override
+  public void unlockChannel() {
+    
   }
 
   public void close() {
-    // Close the connection.
-    channel.close().awaitUninterruptibly();
-    // Shut down all thread pools to exit.
-    channelFactory.releaseExternalResources();
+    stateLock.writeLock().lock();
+    try {
+      // Close the connection.
+      disconnect(true);
+      // Shut down all thread pools to exit.
+      channelFactory.releaseExternalResources();
+    } finally {
+      stateLock.writeLock().unlock();
+    }
   }
 
   @Override
   public String getRemoteName() {
-    return channel.getRemoteAddress().toString();
+    stateLock.readLock().lock();
+    try {
+      return channel.getRemoteAddress().toString();
+    } finally {
+      stateLock.readLock().unlock();
+    }
   }
 
   /**
    * Override as non-synchronized method because the method is thread safe.
    */
   @Override
-  public List<ByteBuffer> transceive(List<ByteBuffer> request)
-      throws IOException {
-    int serial = serialGenerator.incrementAndGet();
-    NettyDataPack dataPack = new NettyDataPack(serial, request);
-    CallFuture callFuture = new CallFuture();
-    requests.put(serial, callFuture);
-    channel.write(dataPack);
+  public List<ByteBuffer> transceive(List<ByteBuffer> request) {
     try {
-      return callFuture.get();
+      CallFuture<List<ByteBuffer>> transceiverFuture = new CallFuture<List<ByteBuffer>>();
+      transceive(request, transceiverFuture);
+      return transceiverFuture.get();
     } catch (InterruptedException e) {
       LOG.warn("failed to get the response", e);
       return null;
     } catch (ExecutionException e) {
       LOG.warn("failed to get the response", e);
       return null;
+    }
+  }
+  
+  @Override
+  public void transceive(List<ByteBuffer> request, Callback<List<ByteBuffer>> callback) {
+    stateLock.readLock().lock();
+    try {
+      int serial = serialGenerator.incrementAndGet();
+      NettyDataPack dataPack = new NettyDataPack(serial, request);
+      requests.put(serial, callback);
+      writeDataPack(dataPack);
     } finally {
-      requests.remove(serial);
+      stateLock.readLock().unlock();
     }
   }
-
+  
   @Override
   public void writeBuffers(List<ByteBuffer> buffers) throws IOException {
-    throw new UnsupportedOperationException();
+    writeDataPack(new NettyDataPack(serialGenerator.incrementAndGet(), buffers));
+  }
+  
+  /**
+   * Writes a NettyDataPack, reconnecting to the remote peer if necessary.
+   * @param dataPack the data pack to write.
+   */
+  private void writeDataPack(NettyDataPack dataPack) {
+    stateLock.readLock().lock();
+    try {
+      while ((channel == null) || !channel.isOpen() || !channel.isBound() || !channel.isConnected()) {
+        // Need to reconnect
+        // Upgrade to write lock
+        stateLock.readLock().unlock();
+        stateLock.writeLock().lock();
+        try {
+          connect();
+        } finally {
+          // Downgrade to read lock:
+          stateLock.readLock().lock();
+          stateLock.writeLock().unlock();
+        }
+      }
+      channel.write(dataPack);
+    } finally {
+      stateLock.readLock().unlock();
+    }
   }
 
   @Override
@@ -154,71 +272,32 @@ public class NettyTransceiver extends Tr
   
   @Override
   public Protocol getRemote() {
-    return remote;
+    stateLock.readLock().lock();
+    try {
+      return remote;
+    } finally {
+      stateLock.readLock().unlock();
+    }
   }
 
   @Override
   public boolean isConnected() {
-    return remote!=null;
+    stateLock.readLock().lock();
+    try {
+      return remote!=null;
+    } finally {
+      stateLock.readLock().unlock();
+    }
   }
 
   @Override
   public void setRemote(Protocol protocol) {
-    this.remote = protocol;
-  }
-
-  /**
-   * Future class for a RPC call
-   */
-  class CallFuture implements Future<List<ByteBuffer>>{
-    private Semaphore sem = new Semaphore(0);
-    private List<ByteBuffer> response = null;
-    
-    public void setResponse(List<ByteBuffer> response) {
-      this.response = response;
-      sem.release();
-    }
-    
-    public void releaseSemphore() {
-      sem.release();
-    }
-
-    public List<ByteBuffer> getResponse() {
-      return response;
-    }
-
-    @Override
-    public boolean cancel(boolean mayInterruptIfRunning) {
-      return false;
-    }
-
-    @Override
-    public boolean isCancelled() {
-      return false;
-    }
-
-    @Override
-    public List<ByteBuffer> get() throws InterruptedException,
-        ExecutionException {
-      sem.acquire();
-      return response;
-    }
-
-    @Override
-    public List<ByteBuffer> get(long timeout, TimeUnit unit)
-        throws InterruptedException, ExecutionException, TimeoutException {
-      if (sem.tryAcquire(timeout, unit)) {
-        return response;
-      } else {
-        throw new TimeoutException();
-      }
-    }
-
-    @Override
-    public boolean isDone() {
-      return sem.availablePermits()>0;
+    stateLock.writeLock().lock();
+    try {
+      this.remote = protocol;
+    } finally {
+      stateLock.writeLock().unlock();
     }
-    
   }
 
   /**
@@ -231,6 +310,33 @@ public class NettyTransceiver extends Tr
         throws Exception {
       if (e instanceof ChannelStateEvent) {
         LOG.info(e.toString());
+        ChannelStateEvent cse = (ChannelStateEvent)e;
+        if ((cse.getState() == ChannelState.OPEN) && (Boolean.FALSE.equals(cse.getValue()))) {
+          // Server closed connection; disconnect client side
+          LOG.info("Remote peer " + remoteAddr + " closed connection.");
+          stateLock.readLock().lock();
+          boolean readLockAcquired = true;
+          try {
+            // Only disconnect if open to prevent deadlock on close()
+            if (open) {
+              // Upgrade to write lock:
+              stateLock.readLock().unlock();
+              readLockAcquired = false;
+              stateLock.writeLock().lock();
+              try {
+                if (open) {
+                  disconnect();
+                }
+              } finally {
+                stateLock.writeLock().unlock();
+              }
+            }
+          } finally {
+            if (readLockAcquired) {
+              stateLock.readLock().unlock();
+            }
+          }
+        }
       }
       super.handleUpstream(ctx, e);
     }
@@ -245,11 +351,15 @@ public class NettyTransceiver extends Tr
     @Override
     public void messageReceived(ChannelHandlerContext ctx, final MessageEvent e) {
       NettyDataPack dataPack = (NettyDataPack)e.getMessage();
-      CallFuture callFuture = requests.get(dataPack.getSerial());
-      if (callFuture==null) {
+      Callback<List<ByteBuffer>> callback = requests.get(dataPack.getSerial());
+      if (callback==null) {
         throw new RuntimeException("Missing previous call info");
       }
-      callFuture.setResponse(dataPack.getDatas());
+      try {
+        callback.handleResult(dataPack.getDatas());
+      } finally {
+        requests.remove(dataPack.getSerial());
+      }
     }
 
     @Override
@@ -257,9 +367,9 @@ public class NettyTransceiver extends Tr
       LOG.warn("Unexpected exception from downstream.", e.getCause());
       e.getChannel().close();
       // let the blocking waiting exit
-      Iterator<CallFuture> it = requests.values().iterator();
+      Iterator<Callback<List<ByteBuffer>>> it = requests.values().iterator();
       while(it.hasNext()) {
-        it.next().releaseSemphore();
+        it.next().handleError(e.getCause());
         it.remove();
       }
       

Modified: avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/Requestor.java
URL: http://svn.apache.org/viewvc/avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/Requestor.java?rev=1136961&r1=1136960&r2=1136961&view=diff
==============================================================================
--- avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/Requestor.java (original)
+++ avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/Requestor.java Fri Jun 17 17:55:21 2011
@@ -20,9 +20,11 @@ package org.apache.avro.ipc;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.List;
 import java.util.Map;
 
@@ -57,12 +59,13 @@ public abstract class Requestor {
   private static final GenericDatumWriter<Map<CharSequence,ByteBuffer>>
     META_WRITER = new GenericDatumWriter<Map<CharSequence,ByteBuffer>>(META);
 
-  private Protocol local;
-  private Protocol remote;
-  private boolean sendLocalText;
-  private Transceiver transceiver;
+  private final Protocol local;
+  private volatile Protocol remote;
+  private volatile boolean sendLocalText;
+  private final Transceiver transceiver;
+  private final ReentrantLock handshakeLock = new ReentrantLock();
   
-  protected List<RPCPlugin> rpcMetaPlugins;
+  protected final List<RPCPlugin> rpcMetaPlugins;
 
   public Protocol getLocal() { return local; }
   public Transceiver getTransceiver() { return transceiver; }
@@ -72,7 +75,7 @@ public abstract class Requestor {
     this.local = local;
     this.transceiver = transceiver;
     this.rpcMetaPlugins =
-      Collections.synchronizedList(new ArrayList<RPCPlugin>());
+      new CopyOnWriteArrayList<RPCPlugin>();
   }
   
   /**
@@ -85,93 +88,96 @@ public abstract class Requestor {
   }
 
   private static final EncoderFactory ENCODER_FACTORY = new EncoderFactory();
-  private BinaryEncoder encoder = 
-    ENCODER_FACTORY.binaryEncoder(new ByteBufferOutputStream(), null);
   
   /** Writes a request message and reads a response or error message. */
-  public synchronized Object request(String messageName, Object request)
+  public Object request(String messageName, Object request)
     throws Exception {
-    Transceiver t = getTransceiver();
-    BinaryDecoder in = null;
-    Message m;
-    RPCContext context = new RPCContext();
-    do {
-      ByteBufferOutputStream bbo = new ByteBufferOutputStream();
-      //safe to use encoder because this is synchronized
-      BinaryEncoder out = ENCODER_FACTORY.binaryEncoder(bbo, encoder);
-      
-      // use local protocol to write request
-      m = getLocal().getMessages().get(messageName);
-      if (m == null)
-        throw new AvroRuntimeException("Not a local message: "+messageName);
-      context.setMessage(m);
+    // Initialize request
+    Request rpcRequest = new Request(messageName, request, new RPCContext());
+    CallFuture<Object> future = /* only need a Future for two-way messages */
+      rpcRequest.getMessage().isOneWay() ? null : new CallFuture<Object>();
     
-      writeRequest(m.getRequest(), request, out); // write request payload
-      
-      out.flush();
-      List<ByteBuffer> payload = bbo.getBufferList();
-      
-      writeHandshake(out);                       // prepend handshake if needed
-      
-      context.setRequestPayload(payload);
-      for (RPCPlugin plugin : rpcMetaPlugins) {
-        plugin.clientSendRequest(context);        // get meta-data from plugins
+    // Send request
+    request(rpcRequest, future);
+    
+    if (future == null)        // the message is one-way, so return immediately
+      return null;
+    try {                      // the message is two-way, wait for the result
+      return future.get();
+    } catch (ExecutionException e) {
+      if (e.getCause() instanceof Exception) {
+        throw (Exception)e.getCause();
+      } else {
+        throw new RuntimeException(e.getCause());
       }
-      META_WRITER.write(context.requestCallMeta(), out);
-
-      out.writeString(m.getName());               // write message name
-
-      out.flush();
-      bbo.append(payload);
-      
-      List<ByteBuffer> requestBytes = bbo.getBufferList();
-
-      if (m.isOneWay() && t.isConnected()) {      // send one-way message
-        t.writeBuffers(requestBytes);
-        
-        return null;
-      } else {                                    // two-way message
-        List<ByteBuffer> response = t.transceive(requestBytes);
-        ByteBufferInputStream bbi = new ByteBufferInputStream(response);
-        in = DecoderFactory.get().binaryDecoder(bbi, in);
+    }
+  }
+  
+  /**
+   * Writes a request message and returns the result through a Callback.
+   * Clients can also use a Future interface by creating a new CallFuture<T>,
+   * passing it in as the Callback parameter, and then waiting on that Future.
+   * @param <T> the return type of the message.
+   * @param messageName the name of the message to invoke.
+   * @param request the request data to send.
+   * @param callback the callback which will be invoked when the response is returned 
+   * or an error occurs.
+   * @throws Exception if an error occurs sending the message.
+   */
+  public <T> void request(String messageName, Object request, Callback<T> callback) 
+    throws Exception {
+    request(new Request(messageName, request, new RPCContext()), callback);
+  }
+  
+  /** Writes a request message and returns the result through a Callback. */
+  <T> void request(Request request, Callback<T> callback)
+    throws Exception {
+    Transceiver t = getTransceiver();
+    if (!t.isConnected()) {
+      // Acquire handshake lock so that only one thread is performing the
+      // handshake and other threads block until the handshake is completed
+      handshakeLock.lock();
+      try {
+        if (t.isConnected()) {
+          // Another thread already completed the handshake; no need to hold
+          // the write lock
+          handshakeLock.unlock();
+        } else {
+          CallFuture<T> callFuture = new CallFuture<T>(callback);
+          t.transceive(request.getBytes(),
+                       new TransceiverCallback<T>(request, callFuture));
+          // Block until handshake complete
+          callFuture.await();
+          return;
+        }
+      } finally{
+        if (handshakeLock.isHeldByCurrentThread()) {
+          handshakeLock.unlock();
+        }
       }
-    } while (!readHandshake(in));
-
-    // use remote protocol to read response
-    Message rm = remote.getMessages().get(messageName);
-    if (rm == null)
-      throw new AvroRuntimeException("Not a remote message: "+messageName);
-
-    if ((m.isOneWay() != rm.isOneWay()) && t.isConnected())
-      throw new AvroRuntimeException("Not both one-way messages: "+messageName);
-
-    if (m.isOneWay() && t.isConnected()) return null; // one-way w/ handshake
-        
-    context.setResponseCallMeta(META_READER.read(null, in));
-    
-    if (!in.readBoolean()) {                      // no error
-      Object response = readResponse(rm.getResponse(), m.getResponse(), in);
-      context.setResponse(response);
-      for (RPCPlugin plugin : rpcMetaPlugins) {
-        plugin.clientReceiveResponse(context);
+    }
+    
+    if (request.getMessage().isOneWay()) {
+      t.lockChannel();
+      try {
+        t.writeBuffers(request.getBytes());
+        if (callback != null) {
+          callback.handleResult(null);
+        }
+      } finally {
+        t.unlockChannel();
       }
-      return response;
-      
     } else {
-      Exception error = readError(rm.getErrors(), m.getErrors(), in);
-      context.setError(error);
-      for (RPCPlugin plugin : rpcMetaPlugins) {
-        plugin.clientReceiveResponse(context);
-      }
-      throw error;
+      t.transceive(request.getBytes(),
+                   new TransceiverCallback<T>(request, callback));
     }
     
   }
 
-  private static final Map<String,MD5> REMOTE_HASHES =
-    Collections.synchronizedMap(new HashMap<String,MD5>());
-  private static final Map<MD5,Protocol> REMOTE_PROTOCOLS =
-    Collections.synchronizedMap(new HashMap<MD5,Protocol>());
+  private static final ConcurrentMap<String,MD5> REMOTE_HASHES =
+    new ConcurrentHashMap<String,MD5>();
+  private static final ConcurrentMap<MD5,Protocol> REMOTE_PROTOCOLS =
+    new ConcurrentHashMap<MD5,Protocol>();
 
   private static final SpecificDatumWriter<HandshakeRequest> HANDSHAKE_WRITER =
     new SpecificDatumWriter<HandshakeRequest>(HandshakeRequest.class);
@@ -185,10 +191,11 @@ public abstract class Requestor {
     localHash.bytes(local.getMD5());
     String remoteName = transceiver.getRemoteName();
     MD5 remoteHash = REMOTE_HASHES.get(remoteName);
-    remote = REMOTE_PROTOCOLS.get(remoteHash);
     if (remoteHash == null) {                     // guess remote is local
       remoteHash = localHash;
       remote = local;
+    } else {
+      remote = REMOTE_PROTOCOLS.get(remoteHash);
     }
     HandshakeRequest handshake = new HandshakeRequest();
     handshake.clientHash = localHash;
@@ -244,30 +251,36 @@ public abstract class Requestor {
     remote = Protocol.parse(handshake.serverProtocol.toString());
     MD5 remoteHash = (MD5)handshake.serverHash;
     REMOTE_HASHES.put(transceiver.getRemoteName(), remoteHash);
-    if (!REMOTE_PROTOCOLS.containsKey(remoteHash))
-      REMOTE_PROTOCOLS.put(remoteHash, remote);
+    REMOTE_PROTOCOLS.putIfAbsent(remoteHash, remote);
   }
 
   /** Return the remote protocol.  Force a handshake if required. */
-  public synchronized Protocol getRemote() throws IOException {
+  public Protocol getRemote() throws IOException {
     if (remote != null) return remote;            // already have it
     MD5 remoteHash = REMOTE_HASHES.get(transceiver.getRemoteName());
-    remote = REMOTE_PROTOCOLS.get(remoteHash);
-    if (remote != null) return remote;            // already cached
-    // force handshake
-    ByteBufferOutputStream bbo = new ByteBufferOutputStream();
-    // direct because the payload is tiny.
-    Encoder out = ENCODER_FACTORY.directBinaryEncoder(bbo, null);
-    writeHandshake(out);
-    out.writeInt(0);                              // empty metadata
-    out.writeString("");                          // bogus message name
-    List<ByteBuffer> response =
-      getTransceiver().transceive(bbo.getBufferList());
-    ByteBufferInputStream bbi = new ByteBufferInputStream(response);
-    BinaryDecoder in =
-      DecoderFactory.get().binaryDecoder(bbi, null);
-    readHandshake(in);
-    return this.remote;
+    if (remoteHash != null) {
+      remote = REMOTE_PROTOCOLS.get(remoteHash);
+      if (remote != null) return remote;            // already cached
+    }
+    handshakeLock.lock();
+    try {
+      // force handshake
+      ByteBufferOutputStream bbo = new ByteBufferOutputStream();
+      // direct because the payload is tiny.
+      Encoder out = ENCODER_FACTORY.directBinaryEncoder(bbo, null);
+      writeHandshake(out);
+      out.writeInt(0);                              // empty metadata
+      out.writeString("");                          // bogus message name
+      List<ByteBuffer> response =
+        getTransceiver().transceive(bbo.getBufferList());
+      ByteBufferInputStream bbi = new ByteBufferInputStream(response);
+      BinaryDecoder in =
+        DecoderFactory.get().binaryDecoder(bbi, null);
+      readHandshake(in);
+      return this.remote;
+    } finally {
+      handshakeLock.unlock();
+    }
   }
 
 
@@ -292,5 +305,249 @@ public abstract class Requestor {
   /** Reads an error message. */
   public abstract Exception readError(Schema writer, Schema reader, Decoder in)
     throws IOException;
-}
+  
+  /**
+   * Handles callbacks from transceiver invocations.
+   */
+  protected class TransceiverCallback<T> implements Callback<List<ByteBuffer>> {
+    private final Request request;
+    private final Callback<T> callback;
+    
+    /**
+     * Creates a TransceiverCallback.
+     * @param request the request to set.
+     * @param callback the callback to set.
+     */
+    public TransceiverCallback(Request request, Callback<T> callback) {
+      this.request = request;
+      this.callback = callback;
+    }
+    
+    @Override
+    @SuppressWarnings("unchecked")
+    public void handleResult(List<ByteBuffer> responseBytes) {
+      ByteBufferInputStream bbi = new ByteBufferInputStream(responseBytes);
+      BinaryDecoder in = DecoderFactory.get().binaryDecoder(bbi, null);
+      try {
+        if (!readHandshake(in)) {
+          // Resend the handshake and return
+          Request handshake = new Request(request);
+          getTransceiver().transceive
+            (handshake.getBytes(),
+             new TransceiverCallback<T>(handshake, callback));
+          return;
+        }
+      } catch (Exception e) {
+        LOG.error("Error handling transceiver callback: " + e, e);
+      }
+      
+      // Read response; invoke callback
+      Response response = new Response(request, in);
+      Object responseObject;
+      try {
+        try {
+          responseObject = response.getResponse();
+        } catch (Exception e) {
+          if (callback != null) {
+            callback.handleError(e);
+          }
+          return;
+        }
+        if (callback != null) {
+          callback.handleResult((T)responseObject);
+        }
+      } catch (Throwable t) {
+        LOG.error("Error in callback handler: " + t, t);
+      }
+    }
+    
+    @Override
+    public void handleError(Throwable error) {
+      callback.handleError(error);
+    }
+  }
+  
+  /**
+   * Encapsulates/generates a request.
+   */
+  class Request {
+    private final String messageName;
+    private final Object request;
+    private final RPCContext context;
+    private final BinaryEncoder encoder;
+    private Message message;
+    private List<ByteBuffer> requestBytes;
+    
+    /**
+     * Creates a Request.
+     * @param messageName the name of the message to invoke.
+     * @param request the request data to send.
+     * @param context the RPC context to use.
+     */
+    public Request(String messageName, Object request, RPCContext context) {
+      this(messageName, request, context, null);
+    }
+    
+    /**
+     * Creates a Request.
+     * @param messageName the name of the message to invoke.
+     * @param request the request data to send.
+     * @param context the RPC context to use.
+     * @param encoder the BinaryEncoder to use to serialize the request.
+     */
+    public Request(String messageName, Object request, RPCContext context,
+                   BinaryEncoder encoder) {
+      this.messageName = messageName;
+      this.request = request;
+      this.context = context;
+      this.encoder =
+        ENCODER_FACTORY.binaryEncoder(new ByteBufferOutputStream(), encoder);
+    }
+    
+    /**
+     * Copy constructor.
+     * @param other Request from which to copy fields.
+     */
+    public Request(Request other) {
+      this.messageName = other.messageName;
+      this.request = other.request;
+      this.context = other.context;
+      this.encoder = other.encoder;
+    }
+    
+    /**
+     * Gets the message name.
+     * @return the message name.
+     */
+    public String getMessageName() {
+      return messageName;
+    }
+    
+    /**
+     * Gets the RPC context.
+     * @return the RPC context.
+     */
+    public RPCContext getContext() {
+      return context;
+    }
+    
+    /**
+     * Gets the Message associated with this request.
+     * @return this request's message.
+     */
+    public Message getMessage() {
+      if (message == null) {
+        message = getLocal().getMessages().get(messageName);
+        if (message == null) {
+          throw new AvroRuntimeException("Not a local message: "+messageName);
+        }
+      }
+      return message;
+    }
+    
+    /**
+     * Gets the request data, generating it first if necessary.
+     * @return the request data.
+     * @throws Exception if an error occurs generating the request data.
+     */
+    public List<ByteBuffer> getBytes() 
+      throws Exception {
+      if (requestBytes == null) {
+        ByteBufferOutputStream bbo = new ByteBufferOutputStream();
+        BinaryEncoder out = ENCODER_FACTORY.binaryEncoder(bbo, encoder);
+
+        // use local protocol to write request
+        Message m = getMessage();
+        context.setMessage(m);
+
+        writeRequest(m.getRequest(), request, out); // write request payload
+
+        out.flush();
+        List<ByteBuffer> payload = bbo.getBufferList();
+
+        writeHandshake(out);                     // prepend handshake if needed
+
+        context.setRequestPayload(payload);
+        for (RPCPlugin plugin : rpcMetaPlugins) {
+          plugin.clientSendRequest(context);      // get meta-data from plugins
+        }
+        META_WRITER.write(context.requestCallMeta(), out);
 
+        out.writeString(m.getName());             // write message name
+
+        out.flush();
+        bbo.append(payload);
+
+        requestBytes = bbo.getBufferList();
+      }
+      return requestBytes;
+    }
+  }
+  
+  /**
+   * Encapsulates/parses a response.
+   */
+  class Response {
+    private final Request request;
+    private final BinaryDecoder in;
+    
+    /**
+     * Creates a Response.
+     * @param request the Request associated with this response.
+     */
+    public Response(Request request) {
+      this(request, null);
+    }
+    
+    /**
+     * Creates a Creates a Response.
+     * @param request the Request associated with this response.
+     * @param in the BinaryDecoder to use to deserialize the response.
+     */
+    public Response(Request request, BinaryDecoder in) {
+      this.request = request;
+      this.in = in;
+    }
+    
+    /**
+     * Gets the RPC response, reading/deserializing it first if necessary.
+     * @return the RPC response.
+     * @throws Exception if an error occurs reading/deserializing the response.
+     */
+    public Object getResponse() 
+      throws Exception {
+      Message lm = request.getMessage();
+      Message rm = remote.getMessages().get(request.getMessageName());
+      if (rm == null)
+        throw new AvroRuntimeException
+          ("Not a remote message: "+request.getMessageName());
+
+      Transceiver t = getTransceiver();
+      if ((lm.isOneWay() != rm.isOneWay()) && t.isConnected())
+        throw new AvroRuntimeException
+          ("Not both one-way messages: "+request.getMessageName());
+
+      if (lm.isOneWay() && t.isConnected()) return null; // one-way w/ handshake
+      
+      RPCContext context = request.getContext();
+      context.setResponseCallMeta(META_READER.read(null, in));
+
+      if (!in.readBoolean()) {                      // no error
+        Object response = readResponse(rm.getResponse(), lm.getResponse(), in);
+        context.setResponse(response);
+        for (RPCPlugin plugin : rpcMetaPlugins) {
+          plugin.clientReceiveResponse(context);
+        }
+        return response;
+
+      } else {
+        Exception error = readError(rm.getErrors(), lm.getErrors(), in);
+        context.setError(error);
+        for (RPCPlugin plugin : rpcMetaPlugins) {
+          plugin.clientReceiveResponse(context);
+        }
+        throw error;
+      }
+    }
+  }
+}

Modified: avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/Responder.java
URL: http://svn.apache.org/viewvc/avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/Responder.java?rev=1136961&r1=1136960&r2=1136961&view=diff
==============================================================================
--- avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/Responder.java (original)
+++ avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/Responder.java Fri Jun 17 17:55:21 2011
@@ -21,9 +21,8 @@ package org.apache.avro.ipc;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.List;
 import java.util.Map;
 
@@ -62,12 +61,12 @@ public abstract class Responder {
   private static final ThreadLocal<Protocol> REMOTE =
     new ThreadLocal<Protocol>();
 
-  private Map<MD5,Protocol> protocols
-    = Collections.synchronizedMap(new HashMap<MD5,Protocol>());
+  private final Map<MD5,Protocol> protocols
+    = new ConcurrentHashMap<MD5,Protocol>();
 
-  private Protocol local;
-  private MD5 localHash;
-  protected List<RPCPlugin> rpcMetaPlugins;
+  private final Protocol local;
+  private final MD5 localHash;
+  protected final List<RPCPlugin> rpcMetaPlugins;
 
   protected Responder(Protocol local) {
     this.local = local;
@@ -75,7 +74,7 @@ public abstract class Responder {
     localHash.bytes(local.getMD5());
     protocols.put(localHash, local);
     this.rpcMetaPlugins =
-      Collections.synchronizedList(new ArrayList<RPCPlugin>());
+      new CopyOnWriteArrayList<RPCPlugin>();
   }
 
   /** Return the remote protocol.  Accesses a {@link ThreadLocal} that's set

Modified: avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/Transceiver.java
URL: http://svn.apache.org/viewvc/avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/Transceiver.java?rev=1136961&r1=1136960&r2=1136961&view=diff
==============================================================================
--- avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/Transceiver.java (original)
+++ avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/Transceiver.java Fri Jun 17 17:55:21 2011
@@ -22,21 +22,58 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.List;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.avro.Protocol;
 
 /** Base transport class used by {@link Requestor}. */
 public abstract class Transceiver implements Closeable {
+  private final ReentrantLock channelLock = new ReentrantLock();
 
   public abstract String getRemoteName();
+  
+  /**
+   * Acquires an exclusive lock on the transceiver's channel.
+   */
+  public void lockChannel() {
+    channelLock.lock();
+  }
+  
+  /**
+   * Releases the lock on the transceiver's channel if held by the calling thread.
+   */
+  public void unlockChannel() {
+    if (channelLock.isHeldByCurrentThread()) {
+      channelLock.unlock();
+    }
+  }
 
   /** Called by {@link Requestor#request(String,Object)} for two-way messages.
    * By default calls {@link #writeBuffers(List)} followed by
    * {@link #readBuffers()}. */
-  public synchronized List<ByteBuffer> transceive(List<ByteBuffer> request)
+  public List<ByteBuffer> transceive(List<ByteBuffer> request)
+    throws IOException {
+    lockChannel();
+    try {
+      writeBuffers(request);
+      return readBuffers();
+    } finally {
+      unlockChannel();
+    }
+  }
+  
+  /** 
+   * Called by {@link Requestor#request(String,Object,Callback)} for two-way messages using callbacks.
+   */
+  public void transceive(List<ByteBuffer> request, Callback<List<ByteBuffer>> callback)
     throws IOException {
-    writeBuffers(request);
-    return readBuffers();
+    // The default implementation works synchronously
+    try {
+      List<ByteBuffer> response = transceive(request);
+      callback.handleResult(response);
+    } catch (IOException e) {
+      callback.handleError(e);
+    }
   }
 
   /** Called by the default definition of {@link #transceive(List)}.*/

Modified: avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/generic/GenericRequestor.java
URL: http://svn.apache.org/viewvc/avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/generic/GenericRequestor.java?rev=1136961&r1=1136960&r2=1136961&view=diff
==============================================================================
--- avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/generic/GenericRequestor.java (original)
+++ avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/generic/GenericRequestor.java Fri Jun 17 17:55:21 2011
@@ -28,6 +28,7 @@ import org.apache.avro.generic.GenericDa
 import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.io.Decoder;
 import org.apache.avro.io.Encoder;
+import org.apache.avro.ipc.Callback;
 import org.apache.avro.ipc.Requestor;
 import org.apache.avro.ipc.Transceiver;
 
@@ -53,6 +54,20 @@ public class GenericRequestor extends Re
   }
 
   @Override
+  public <T> void request(String messageName, Object request, Callback<T> callback)
+    throws IOException {
+    try {
+      super.request(messageName, request, callback);
+    } catch (Exception e) {
+      if (e instanceof RuntimeException)
+        throw (RuntimeException)e;
+      if (e instanceof IOException)
+        throw (IOException)e;
+      throw new AvroRemoteException(e);
+    }
+  }
+
+  @Override
   public void writeRequest(Schema schema, Object request, Encoder out)
     throws IOException {
     new GenericDatumWriter<Object>(schema).write(request, out);

Modified: avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/specific/SpecificRequestor.java
URL: http://svn.apache.org/viewvc/avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/specific/SpecificRequestor.java?rev=1136961&r1=1136960&r2=1136961&view=diff
==============================================================================
--- avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/specific/SpecificRequestor.java (original)
+++ avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/specific/SpecificRequestor.java Fri Jun 17 17:55:21 2011
@@ -22,6 +22,8 @@ import java.io.IOException;
 import java.lang.reflect.Method;
 import java.lang.reflect.Proxy;
 import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Type;
+import java.util.Arrays;
 
 import org.apache.avro.Protocol;
 import org.apache.avro.Schema;
@@ -32,6 +34,7 @@ import org.apache.avro.io.Decoder;
 import org.apache.avro.io.Encoder;
 import org.apache.avro.ipc.Transceiver;
 import org.apache.avro.ipc.Requestor;
+import org.apache.avro.ipc.Callback;
 import org.apache.avro.specific.SpecificData;
 import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.avro.specific.SpecificDatumWriter;
@@ -52,7 +55,20 @@ public class SpecificRequestor extends R
   @Override
   public Object invoke(Object proxy, Method method, Object[] args)
     throws Throwable {
-    return request(method.getName(), args);
+    // Check if this is a callback-based RPC:
+    Type[] parameterTypes = method.getParameterTypes();
+    if ((parameterTypes.length > 0) &&
+        (parameterTypes[parameterTypes.length - 1] instanceof Class) &&
+        Callback.class.isAssignableFrom(((Class<?>)parameterTypes[parameterTypes.length - 1]))) {
+      // Extract the Callback from the end of of the argument list
+      Object[] finalArgs = Arrays.copyOf(args, args.length - 1);
+      Callback<?> callback = (Callback<?>)args[args.length - 1];
+      request(method.getName(), finalArgs, callback);
+      return null;
+    }
+    else {
+      return request(method.getName(), args);
+    }
   }
 
   protected DatumWriter<Object> getDatumWriter(Schema schema) {

Modified: avro/branches/branch-1.5/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServer.java
URL: http://svn.apache.org/viewvc/avro/branches/branch-1.5/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServer.java?rev=1136961&r1=1136960&r2=1136961&view=diff
==============================================================================
--- avro/branches/branch-1.5/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServer.java (original)
+++ avro/branches/branch-1.5/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServer.java Fri Jun 17 17:55:21 2011
@@ -18,7 +18,11 @@
 
 package org.apache.avro.ipc;
 
+import static org.junit.Assert.assertEquals;
+
 import java.net.InetSocketAddress;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import junit.framework.Assert;
 
@@ -27,52 +31,109 @@ import org.apache.avro.ipc.specific.Spec
 import org.apache.avro.test.Mail;
 import org.apache.avro.test.Message;
 import org.apache.avro.util.Utf8;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 public class TestNettyServer {
 
+  private static Server server;
+  private static Transceiver transceiver;
+  private static Mail proxy;
+  private static MailImpl mailService;
+
   public static class MailImpl implements Mail {
+
+    private CountDownLatch allMessages = new CountDownLatch(5);
+    
     // in this simple example just return details of the message
     public CharSequence send(Message message) {
       return new Utf8("Sent message to [" + message.to.toString() + "] from ["
           + message.from.toString() + "] with body [" + message.body.toString()
           + "]");
     }
-  }
+    
+    public void fireandforget(Message message) {
+      allMessages.countDown();
+    }
+    
+    private void awaitMessages() throws InterruptedException {
+      allMessages.await(2, TimeUnit.SECONDS);
+    }
+    
+    private void assertAllMessagesReceived() {
+      assertEquals(0, allMessages.getCount());
+    }
 
-  @Test
-  public void test() throws Exception {
+    public void reset() {
+      allMessages = new CountDownLatch(5);      
+    }
+  }
+  
+  @BeforeClass
+  public static void initializeConnections()throws Exception {
     // start server
     System.out.println("starting server...");
-    Responder responder = new SpecificResponder(Mail.class, new MailImpl());
-    Server server = new NettyServer(responder, new InetSocketAddress(0));
+    mailService = new MailImpl();
+    Responder responder = new SpecificResponder(Mail.class, mailService);
+    server = new NettyServer(responder, new InetSocketAddress(0));
     server.start();
-
+  
     int serverPort = server.getPort();
     System.out.println("server port : " + serverPort);
 
-    // client
-    Transceiver transceiver = new NettyTransceiver(new InetSocketAddress(
+    transceiver = new NettyTransceiver(new InetSocketAddress(
         serverPort));
-    Mail proxy = SpecificRequestor.getClient(Mail.class, transceiver);
+    proxy = SpecificRequestor.getClient(Mail.class, transceiver);
+  }
+  
+  @AfterClass
+  public static void tearDownConnections() throws Exception{
+    transceiver.close();
+    server.close();
+  }
 
+  @Test
+  public void testRequestResponse() throws Exception {
+      for(int x = 0; x < 5; x++) {
+        verifyResponse(proxy.send(createMessage()));
+      }
+  }
+
+  private void verifyResponse(CharSequence result) {
+    Assert.assertEquals(
+        "Sent message to [wife] from [husband] with body [I love you!]",
+        result.toString());
+  }
+  
+  @Test
+  public void testOneway() throws Exception {
+    for (int x = 0; x < 5; x++) {
+      proxy.fireandforget(createMessage());
+    }
+    mailService.awaitMessages();
+    mailService.assertAllMessagesReceived();
+  }
+  
+  @Test
+  public void testMixtureOfRequests() throws Exception {
+    mailService.reset();
+    for (int x = 0; x < 5; x++) {
+      Message createMessage = createMessage();
+      proxy.fireandforget(createMessage);
+      verifyResponse(proxy.send(createMessage));
+    }
+    mailService.awaitMessages();
+    mailService.assertAllMessagesReceived();
+
+  }
+
+  private Message createMessage() {
     Message msg = new Message();
     msg.to = new Utf8("wife");
     msg.from = new Utf8("husband");
     msg.body = new Utf8("I love you!");
-
-    try {
-      for(int x = 0; x < 5; x++) {
-        CharSequence result = proxy.send(msg);
-        System.out.println("Result: " + result);
-        Assert.assertEquals(
-            "Sent message to [wife] from [husband] with body [I love you!]",
-            result.toString());
-      }
-    } finally {
-      transceiver.close();
-      server.close();
-    }
+    return msg;
   }
 
 }

Modified: avro/branches/branch-1.5/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestRpcPluginOrdering.java
URL: http://svn.apache.org/viewvc/avro/branches/branch-1.5/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestRpcPluginOrdering.java?rev=1136961&r1=1136960&r2=1136961&view=diff
==============================================================================
--- avro/branches/branch-1.5/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestRpcPluginOrdering.java (original)
+++ avro/branches/branch-1.5/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestRpcPluginOrdering.java Fri Jun 17 17:55:21 2011
@@ -94,5 +94,7 @@ public class TestRpcPluginOrdering {
     public CharSequence send(Message message) throws AvroRemoteException {
       return new Utf8("Received");
     }
+    public void fireandforget(Message message) {
+    }
   }
 }

Modified: avro/branches/branch-1.5/share/test/schemas/mail.avpr
URL: http://svn.apache.org/viewvc/avro/branches/branch-1.5/share/test/schemas/mail.avpr?rev=1136961&r1=1136960&r2=1136961&view=diff
==============================================================================
--- avro/branches/branch-1.5/share/test/schemas/mail.avpr (original)
+++ avro/branches/branch-1.5/share/test/schemas/mail.avpr Fri Jun 17 17:55:21 2011
@@ -15,6 +15,12 @@
      "send": {
          "request": [{"name": "message", "type": "Message"}],
          "response": "string"
+     },
+     "fireandforget": {
+         "request": [{"name": "message", "type": "Message"}],
+         "response": "null",
+         "one-way": true
      }
+
  }
-}
\ No newline at end of file
+}