You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by cu...@apache.org on 2011/07/01 18:30:54 UTC

svn commit: r1141980 - in /avro/trunk: ./ lang/java/ipc/src/main/java/org/apache/avro/ipc/ lang/java/ipc/src/main/java/org/apache/avro/ipc/specific/ lang/java/ipc/src/test/java/org/apache/avro/ lang/java/ipc/src/test/java/org/apache/avro/ipc/

Author: cutting
Date: Fri Jul  1 16:30:54 2011
New Revision: 1141980

URL: http://svn.apache.org/viewvc?rev=1141980&view=rev
Log:
AVRO-842. Java: Fix Netty-based IPC client to provide better errors when attempting to use a closed connection.  Contributed by James Baldassari.

Modified:
    avro/trunk/CHANGES.txt
    avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java
    avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/Requestor.java
    avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/Transceiver.java
    avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/specific/SpecificRequestor.java
    avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/TestProtocolHttp.java
    avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithCallbacks.java

Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1141980&r1=1141979&r2=1141980&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Fri Jul  1 16:30:54 2011
@@ -74,6 +74,10 @@ Avro 1.5.2 (unreleased)
 
     AVRO-776. Java: Fix SocketServer to close socket. (scottcarey)
 
+    AVRO-842. Java: Fix Netty-based IPC client to provide better
+    errors when attempting to use a closed connection.
+    (James Baldassari via cutting)
+
 Avro 1.5.1 (3 May 2011)
 
   NEW FEATURES

Modified: avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java?rev=1141980&r1=1141979&r2=1141980&view=diff
==============================================================================
--- avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java (original)
+++ avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java Fri Jul  1 16:30:54 2011
@@ -73,7 +73,6 @@ public class NettyTransceiver extends Tr
    * Write lock must be acquired whenever modifying state.
    */
   private final ReadWriteLock stateLock = new ReentrantReadWriteLock();
-  private boolean open = false;  // Synchronized on stateLock
   private Channel channel;       // Synchronized on stateLock
   private Protocol remote;       // Synchronized on stateLock
 
@@ -83,12 +82,27 @@ public class NettyTransceiver extends Tr
     remoteAddr = null;
   }
 
-  public NettyTransceiver(InetSocketAddress addr) {
+  /**
+   * Creates a NettyTransceiver, and attempts to connect to the given address.
+   * @param addr the address to connect to.
+   * @throws IOException if an error occurs connecting to the given address.
+   */
+  public NettyTransceiver(InetSocketAddress addr) throws IOException {
     this(addr, new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), 
         Executors.newCachedThreadPool()));
   }
 
-  public NettyTransceiver(InetSocketAddress addr, ChannelFactory channelFactory) {
+  /**
+   * Creates a NettyTransceiver, and attempts to connect to the given address.
+   * @param addr the address to connect to.
+   * @param channelFactory the factory to use to create a new Netty Channel.
+   * @throws IOException if an error occurs connecting to the given address.
+   */
+  public NettyTransceiver(InetSocketAddress addr, ChannelFactory channelFactory) throws IOException {
+    if (channelFactory == null) {
+      throw new NullPointerException("channelFactory is null");
+    }
+    
     // Set up.
     this.channelFactory = channelFactory;
     bootstrap = new ClientBootstrap(channelFactory);
@@ -109,43 +123,71 @@ public class NettyTransceiver extends Tr
     bootstrap.setOption("tcpNoDelay", true);
 
     // Make a new connection.
-    connect();
+    stateLock.readLock().lock();
+    try {
+      getChannel();
+    } finally {
+      stateLock.readLock().unlock();
+    }
   }
   
   /**
-   * Connects to the remote peer if not already connected.
+   * Tests whether the given channel is ready for writing.
+   * @return true if the channel is open and ready; false otherwise.
    */
-  private void connect() {
-    stateLock.writeLock().lock();
-    try {
-      if (!open || (channel == null) || !channel.isOpen() || !channel.isBound() || !channel.isConnected()) {
+  private static boolean isChannelReady(Channel channel) {
+    return (channel != null) && 
+      channel.isOpen() && channel.isBound() && channel.isConnected();
+  }
+  
+  /**
+   * Gets the Netty channel.  If the channel is not connected, first attempts 
+   * to connect.
+   * NOTE: The stateLock read lock *must* be acquired before calling this 
+   * method.
+   * @return the Netty channel
+   * @throws IOException if an error occurs connecting the channel.
+   */
+  private Channel getChannel() throws IOException {
+    if (!isChannelReady(channel)) {
+      // Need to reconnect
+      // Upgrade to write lock
+      stateLock.readLock().unlock();
+      stateLock.writeLock().lock();
+      try {
         LOG.info("Connecting to " + remoteAddr);
         ChannelFuture channelFuture = bootstrap.connect(remoteAddr);
         channelFuture.awaitUninterruptibly();
         if (!channelFuture.isSuccess()) {
           channelFuture.getCause().printStackTrace();
-          throw new RuntimeException(channelFuture.getCause());
+          throw new IOException("Error connecting to " + remoteAddr, 
+              channelFuture.getCause());
         }
         channel = channelFuture.getChannel();
-        open = true;
+      } finally {
+        // Downgrade to read lock:
+        stateLock.readLock().lock();
+        stateLock.writeLock().unlock();
       }
-    } finally {
-      stateLock.writeLock().unlock();
     }
+    return channel;
   }
   
   /**
    * Closes the connection to the remote peer if connected.
    */
   private void disconnect() {
-    disconnect(false);
+    disconnect(false, false);
   }
   
   /**
    * Closes the connection to the remote peer if connected.
    * @param awaitCompletion if true, will block until the close has completed.
+   * @param cancelPendingRequests if true, will drain the requests map and 
+   * send an IOException to all Callbacks.
    */
-  private void disconnect(boolean awaitCompletion) {
+  private void disconnect(boolean awaitCompletion, boolean cancelPendingRequests) {
+    Map<Integer, Callback<List<ByteBuffer>>> requestsToCancel = null;
     stateLock.writeLock().lock();
     try {
       if (channel != null) {
@@ -156,11 +198,25 @@ public class NettyTransceiver extends Tr
         }
         channel = null;
         remote = null;
-        open = false;
+        if (cancelPendingRequests) {
+          // Remove all pending requests (will be canceled after relinquishing 
+          // write lock).
+          requestsToCancel = 
+            new ConcurrentHashMap<Integer, Callback<List<ByteBuffer>>>(requests);
+          requests.clear();
+        }
       }
     } finally {
       stateLock.writeLock().unlock();
     }
+    
+    if ((requestsToCancel != null) && !requestsToCancel.isEmpty()) {
+      LOG.warn("Removing " + requestsToCancel.size() + " pending request(s).");
+      for (Callback<List<ByteBuffer>> request : requestsToCancel.values()) {
+        request.handleError(
+            new IOException(getClass().getSimpleName() + " closed"));
+      }
+    }
   }
   
   /**
@@ -182,22 +238,20 @@ public class NettyTransceiver extends Tr
   }
 
   public void close() {
-    stateLock.writeLock().lock();
     try {
-      // Close the connection.
-      disconnect(true);
+      // Close the connection:
+      disconnect(true, true);
+    } finally {
       // Shut down all thread pools to exit.
       channelFactory.releaseExternalResources();
-    } finally {
-      stateLock.writeLock().unlock();
     }
   }
 
   @Override
-  public String getRemoteName() {
+  public String getRemoteName() throws IOException {
     stateLock.readLock().lock();
     try {
-      return channel.getRemoteAddress().toString();
+      return getChannel().getRemoteAddress().toString();
     } finally {
       stateLock.readLock().unlock();
     }
@@ -207,7 +261,8 @@ public class NettyTransceiver extends Tr
    * Override as non-synchronized method because the method is thread safe.
    */
   @Override
-  public List<ByteBuffer> transceive(List<ByteBuffer> request) {
+  public List<ByteBuffer> transceive(List<ByteBuffer> request) 
+    throws IOException {
     try {
       CallFuture<List<ByteBuffer>> transceiverFuture = new CallFuture<List<ByteBuffer>>();
       transceive(request, transceiverFuture);
@@ -222,7 +277,8 @@ public class NettyTransceiver extends Tr
   }
   
   @Override
-  public void transceive(List<ByteBuffer> request, Callback<List<ByteBuffer>> callback) {
+  public void transceive(List<ByteBuffer> request, 
+      Callback<List<ByteBuffer>> callback) throws IOException {
     stateLock.readLock().lock();
     try {
       int serial = serialGenerator.incrementAndGet();
@@ -242,24 +298,12 @@ public class NettyTransceiver extends Tr
   /**
    * Writes a NettyDataPack, reconnecting to the remote peer if necessary.
    * @param dataPack the data pack to write.
+   * @throws IOException if an error occurs connecting to the remote peer.
    */
-  private void writeDataPack(NettyDataPack dataPack) {
+  private void writeDataPack(NettyDataPack dataPack) throws IOException {
     stateLock.readLock().lock();
     try {
-      while ((channel == null) || !channel.isOpen() || !channel.isBound() || !channel.isConnected()) {
-        // Need to reconnect
-        // Upgrade to write lock
-        stateLock.readLock().unlock();
-        stateLock.writeLock().lock();
-        try {
-          connect();
-        } finally {
-          // Downgrade to read lock:
-          stateLock.readLock().lock();
-          stateLock.writeLock().unlock();
-        }
-      }
-      channel.write(dataPack);
+      getChannel().write(dataPack);
     } finally {
       stateLock.readLock().unlock();
     }
@@ -314,28 +358,7 @@ public class NettyTransceiver extends Tr
         if ((cse.getState() == ChannelState.OPEN) && (Boolean.FALSE.equals(cse.getValue()))) {
           // Server closed connection; disconnect client side
           LOG.info("Remote peer " + remoteAddr + " closed connection.");
-          stateLock.readLock().lock();
-          boolean readLockAcquired = true;
-          try {
-            // Only disconnect if open to prevent deadlock on close()
-            if (open) {
-              // Upgrade to write lock:
-              stateLock.readLock().unlock();
-              readLockAcquired = false;
-              stateLock.writeLock().lock();
-              try {
-                if (open) {
-                  disconnect();
-                }
-              } finally {
-                stateLock.writeLock().unlock();
-              }
-            }
-          } finally {
-            if (readLockAcquired) {
-              stateLock.readLock().unlock();
-            }
-          }
+          disconnect();
         }
       }
       super.handleUpstream(ctx, e);

Modified: avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/Requestor.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/Requestor.java?rev=1141980&r1=1141979&r2=1141980&view=diff
==============================================================================
--- avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/Requestor.java (original)
+++ avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/Requestor.java Fri Jul  1 16:30:54 2011
@@ -28,6 +28,7 @@ import java.util.concurrent.locks.Reentr
 import java.util.List;
 import java.util.Map;
 
+import org.apache.avro.AvroRemoteException;
 import org.apache.avro.AvroRuntimeException;
 import org.apache.avro.Protocol;
 import org.apache.avro.Schema;
@@ -108,7 +109,7 @@ public abstract class Requestor {
       if (e.getCause() instanceof Exception) {
         throw (Exception)e.getCause();
       } else {
-        throw new RuntimeException(e.getCause());
+        throw new AvroRemoteException(e.getCause());
       }
     }
   }
@@ -247,7 +248,7 @@ public abstract class Requestor {
     return established;
   }
 
-  private void setRemote(HandshakeResponse handshake) {
+  private void setRemote(HandshakeResponse handshake) throws IOException {
     remote = Protocol.parse(handshake.serverProtocol.toString());
     MD5 remoteHash = (MD5)handshake.serverHash;
     REMOTE_HASHES.put(transceiver.getRemoteName(), remoteHash);

Modified: avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/Transceiver.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/Transceiver.java?rev=1141980&r1=1141979&r2=1141980&view=diff
==============================================================================
--- avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/Transceiver.java (original)
+++ avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/Transceiver.java Fri Jul  1 16:30:54 2011
@@ -30,7 +30,7 @@ import org.apache.avro.Protocol;
 public abstract class Transceiver implements Closeable {
   private final ReentrantLock channelLock = new ReentrantLock();
 
-  public abstract String getRemoteName();
+  public abstract String getRemoteName() throws IOException;
   
   /**
    * Acquires an exclusive lock on the transceiver's channel.

Modified: avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/specific/SpecificRequestor.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/specific/SpecificRequestor.java?rev=1141980&r1=1141979&r2=1141980&view=diff
==============================================================================
--- avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/specific/SpecificRequestor.java (original)
+++ avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/specific/SpecificRequestor.java Fri Jul  1 16:30:54 2011
@@ -25,6 +25,7 @@ import java.lang.reflect.InvocationHandl
 import java.lang.reflect.Type;
 import java.util.Arrays;
 
+import org.apache.avro.AvroRemoteException;
 import org.apache.avro.Protocol;
 import org.apache.avro.Schema;
 import org.apache.avro.AvroRuntimeException;
@@ -55,19 +56,36 @@ public class SpecificRequestor extends R
   @Override
   public Object invoke(Object proxy, Method method, Object[] args)
     throws Throwable {
-    // Check if this is a callback-based RPC:
-    Type[] parameterTypes = method.getParameterTypes();
-    if ((parameterTypes.length > 0) &&
-        (parameterTypes[parameterTypes.length - 1] instanceof Class) &&
-        Callback.class.isAssignableFrom(((Class<?>)parameterTypes[parameterTypes.length - 1]))) {
-      // Extract the Callback from the end of of the argument list
-      Object[] finalArgs = Arrays.copyOf(args, args.length - 1);
-      Callback<?> callback = (Callback<?>)args[args.length - 1];
-      request(method.getName(), finalArgs, callback);
-      return null;
-    }
-    else {
-      return request(method.getName(), args);
+    try {
+      // Check if this is a callback-based RPC:
+      Type[] parameterTypes = method.getParameterTypes();
+      if ((parameterTypes.length > 0) &&
+          (parameterTypes[parameterTypes.length - 1] instanceof Class) &&
+          Callback.class.isAssignableFrom(((Class<?>)parameterTypes[parameterTypes.length - 1]))) {
+        // Extract the Callback from the end of of the argument list
+        Object[] finalArgs = Arrays.copyOf(args, args.length - 1);
+        Callback<?> callback = (Callback<?>)args[args.length - 1];
+        request(method.getName(), finalArgs, callback);
+        return null;
+      }
+      else {
+        return request(method.getName(), args);
+      }
+    } catch (Exception e) {
+      // Check if this is a declared Exception:
+      for (Class<?> exceptionClass : method.getExceptionTypes()) {
+        if (exceptionClass.isAssignableFrom(e.getClass())) {
+          throw e;
+        }
+      }
+      
+      // Next, check for RuntimeExceptions:
+      if (e instanceof RuntimeException) {
+        throw e;
+      }
+      
+      // Not an expected Exception, so wrap it in AvroRemoteException:
+      throw new AvroRemoteException(e);
     }
   }
 

Modified: avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/TestProtocolHttp.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/TestProtocolHttp.java?rev=1141980&r1=1141979&r2=1141980&view=diff
==============================================================================
--- avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/TestProtocolHttp.java (original)
+++ avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/TestProtocolHttp.java Fri Jul  1 16:30:54 2011
@@ -34,7 +34,6 @@ import org.junit.Test;
 import java.net.URL;
 import java.net.ServerSocket;
 import java.net.SocketTimeoutException;
-import java.lang.reflect.UndeclaredThrowableException;
 import java.util.ArrayList;
 
 public class TestProtocolHttp extends TestProtocolSpecific {
@@ -62,7 +61,7 @@ public class TestProtocolHttp extends Te
     Simple proxy = SpecificRequestor.getClient(Simple.class, client);
     try {
       proxy.hello("foo");
-    } catch (UndeclaredThrowableException e) {
+    } catch (AvroRemoteException e) {
       throw e.getCause();
     } finally {
       s.close();

Modified: avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithCallbacks.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithCallbacks.java?rev=1141980&r1=1141979&r2=1141980&view=diff
==============================================================================
--- avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithCallbacks.java (original)
+++ avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithCallbacks.java Fri Jul  1 16:30:54 2011
@@ -25,6 +25,7 @@ import java.util.concurrent.CountDownLat
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -256,6 +257,119 @@ public class TestNettyServerWithCallback
     Assert.assertFalse("Expected ack flag to be cleared", ackFlag.get());
   }
   
+  @Test
+  public void testSendAfterChannelClose() throws Exception {
+    // Start up a second server so that closing the server doesn't 
+    // interfere with the other unit tests:
+    Server server2 = new NettyServer(new SpecificResponder(Simple.class, simpleService), 
+        new InetSocketAddress(0));
+    server2.start();
+    try {
+      int serverPort = server2.getPort();
+      System.out.println("server2 port : " + serverPort);
+
+      Transceiver transceiver2 = new NettyTransceiver(new InetSocketAddress(
+          serverPort));
+      try {
+        Simple.Callback simpleClient2 = 
+          SpecificRequestor.getClient(Simple.Callback.class, transceiver2);
+
+        // Verify that connection works:
+        Assert.assertEquals(3, simpleClient2.add(1, 2));
+        
+        // Try again with callbacks:
+        CallFuture<Integer> addFuture = new CallFuture<Integer>();
+        simpleClient2.add(1, 2, addFuture);
+        Assert.assertEquals(new Integer(3), addFuture.get());
+
+        // Shut down server:
+        server2.close();
+
+        // Send a new RPC, and verify that it throws an Exception that 
+        // can be detected by the client:
+        boolean ioeCaught = false;
+        try {
+          simpleClient2.add(1, 2);
+          Assert.fail("Send after server close should have thrown Exception");
+        } catch (AvroRemoteException e) {
+          ioeCaught = e.getCause() instanceof IOException;
+          Assert.assertTrue("Expected IOException", ioeCaught);
+        } catch (Exception e) {
+          e.printStackTrace();
+          throw e;
+        }
+        Assert.assertTrue("Expected IOException", ioeCaught);
+        
+        // Send a new RPC with callback, and verify that the correct Exception 
+        // is thrown:
+        ioeCaught = false;
+        try {
+          addFuture = new CallFuture<Integer>();
+          simpleClient2.add(1, 2, addFuture);
+          addFuture.get();
+          Assert.fail("Send after server close should have thrown Exception");
+        } catch (IOException e) {
+          ioeCaught = true;
+        } catch (Exception e) {
+          e.printStackTrace();
+          throw e;
+        }
+        Assert.assertTrue("Expected IOException", ioeCaught);
+      } finally {
+        transceiver2.close();
+      }
+    } finally {
+      server2.close();
+    }
+  }
+  
+  @Test
+  public void cancelPendingRequestsOnTransceiverClose() throws Exception {
+    // Start up a second server so that closing the server doesn't 
+    // interfere with the other unit tests:
+    BlockingSimpleImpl blockingSimpleImpl = new BlockingSimpleImpl();
+    Server server2 = new NettyServer(new SpecificResponder(Simple.class, 
+        blockingSimpleImpl), new InetSocketAddress(0));
+    server2.start();
+    try {
+      int serverPort = server2.getPort();
+      System.out.println("server2 port : " + serverPort);
+
+      CallFuture<Integer> addFuture = new CallFuture<Integer>();
+      Transceiver transceiver2 = new NettyTransceiver(new InetSocketAddress(
+          serverPort));
+      try {        
+        Simple.Callback simpleClient2 = 
+          SpecificRequestor.getClient(Simple.Callback.class, transceiver2);
+        
+        // The first call has to block for the handshake:
+        Assert.assertEquals(3, simpleClient2.add(1, 2));
+        
+        // Now acquire the semaphore so that the server will block:
+        blockingSimpleImpl.acquirePermit();
+        simpleClient2.add(1, 2, addFuture);
+      } finally {
+        // When the transceiver is closed, the CallFuture should get 
+        // an IOException
+        transceiver2.close();
+      }
+      boolean ioeThrown = false;
+      try {
+        addFuture.get();
+      } catch (ExecutionException e) {
+        ioeThrown = e.getCause() instanceof IOException;
+        Assert.assertTrue(e.getCause() instanceof IOException);
+      } catch (Exception e) {
+        e.printStackTrace();
+        Assert.fail("Unexpected Exception: " + e.toString());
+      }
+      Assert.assertTrue("Expected IOException to be thrown", ioeThrown);
+    } finally {
+      blockingSimpleImpl.releasePermit();
+      server2.close();
+    }
+  }
+  
   @Ignore
   @Test
   public void performanceTest() throws Exception {
@@ -342,4 +456,92 @@ public class TestNettyServerWithCallback
       ackLatch.get().countDown();
     }
   }
+  
+  /**
+   * A SimpleImpl that requires a semaphore permit before executing any method.
+   */
+  private static class BlockingSimpleImpl extends SimpleImpl {
+    private final Semaphore semaphore = new Semaphore(1);
+    
+    /**
+     * Creates a BlockingSimpleImpl.
+     */
+    public BlockingSimpleImpl() {
+      super(new AtomicBoolean());
+    }
+    
+    @Override
+    public CharSequence hello(CharSequence greeting) throws AvroRemoteException {
+      acquirePermit();
+      try {
+        return super.hello(greeting);
+      } finally {
+        releasePermit();
+      }
+    }
+
+    @Override
+    public TestRecord echo(TestRecord record) throws AvroRemoteException {
+      acquirePermit();
+      try {
+        return super.echo(record);
+      } finally {
+        releasePermit();
+      }
+    }
+
+    @Override
+    public int add(int arg1, int arg2) throws AvroRemoteException {
+      acquirePermit();
+      try {
+        return super.add(arg1, arg2);
+      } finally {
+        releasePermit();
+      }
+    }
+
+    @Override
+    public ByteBuffer echoBytes(ByteBuffer data) throws AvroRemoteException {
+      acquirePermit();
+      try {
+        return super.echoBytes(data);
+      } finally {
+        releasePermit();
+      }
+    }
+
+    @Override
+    public Void error() throws AvroRemoteException, TestError {
+      acquirePermit();
+      try {
+        return super.error();
+      } finally {
+        releasePermit();
+      }
+    }
+
+    @Override
+    public void ack() {
+      acquirePermit();
+      try {
+        super.ack();
+      } finally {
+        releasePermit();
+      }
+    }
+    
+    /**
+     * Acquires a single permit from the semaphore.
+     */
+    public void acquirePermit() {
+      semaphore.acquireUninterruptibly();
+    }
+    
+    /**
+     * Releases a single permit to the semaphore.
+     */
+    public void releasePermit() {
+      semaphore.release();
+    }
+  }
 }