You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@thrift.apache.org by br...@apache.org on 2010/08/19 23:41:59 UTC

svn commit: r987323 - in /incubator/thrift/trunk/lib/java: src/org/apache/thrift/async/ test/org/apache/thrift/async/

Author: bryanduxbury
Date: Thu Aug 19 21:41:58 2010
New Revision: 987323

URL: http://svn.apache.org/viewvc?rev=987323&view=rev
Log:
THRIFT-845. java: async client does not respect timeout

This patch adds timeout handling to async method calls through TAsyncClientManager.

Patch: Ning Liang

Modified:
    incubator/thrift/trunk/lib/java/src/org/apache/thrift/async/TAsyncClient.java
    incubator/thrift/trunk/lib/java/src/org/apache/thrift/async/TAsyncClientManager.java
    incubator/thrift/trunk/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java
    incubator/thrift/trunk/lib/java/test/org/apache/thrift/async/TestTAsyncClientManager.java

Modified: incubator/thrift/trunk/lib/java/src/org/apache/thrift/async/TAsyncClient.java
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/java/src/org/apache/thrift/async/TAsyncClient.java?rev=987323&r1=987322&r2=987323&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/java/src/org/apache/thrift/async/TAsyncClient.java (original)
+++ incubator/thrift/trunk/lib/java/src/org/apache/thrift/async/TAsyncClient.java Thu Aug 19 21:41:58 2010
@@ -27,17 +27,35 @@ public abstract class TAsyncClient {
   protected final TAsyncClientManager manager;
   private TAsyncMethodCall currentMethod;
   private Throwable error;
+  private long timeout;
 
   public TAsyncClient(TProtocolFactory protocolFactory, TAsyncClientManager manager, TNonblockingTransport transport) {
+    this(protocolFactory, manager, transport, 0);
+  }
+
+  public TAsyncClient(TProtocolFactory protocolFactory, TAsyncClientManager manager, TNonblockingTransport transport, long timeout) {
     this.protocolFactory = protocolFactory;
     this.manager = manager;
     this.transport = transport;
+    this.timeout = timeout;
   }
 
   public TProtocolFactory getProtocolFactory() {
     return protocolFactory;
   }
 
+  public long getTimeout() { 
+    return timeout;
+  }
+
+  public boolean hasTimeout() {
+    return timeout > 0;
+  }
+
+  public void setTimeout(long timeout) {
+    this.timeout = timeout;
+  }
+
   /**
    * Is the client in an error state?
    * @return

Modified: incubator/thrift/trunk/lib/java/src/org/apache/thrift/async/TAsyncClientManager.java
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/java/src/org/apache/thrift/async/TAsyncClientManager.java?rev=987323&r1=987322&r2=987323&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/java/src/org/apache/thrift/async/TAsyncClientManager.java (original)
+++ incubator/thrift/trunk/lib/java/src/org/apache/thrift/async/TAsyncClientManager.java Thu Aug 19 21:41:58 2010
@@ -19,14 +19,15 @@
 package org.apache.thrift.async;
 
 import java.io.IOException;
+import java.nio.channels.ClosedSelectorException;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
 import java.nio.channels.spi.SelectorProvider;
-import java.nio.channels.CancelledKeyException;
-import java.nio.channels.ClosedChannelException;
-import java.nio.channels.ClosedSelectorException;
+import java.util.HashSet;
 import java.util.Iterator;
+import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
@@ -35,6 +36,7 @@ import org.slf4j.LoggerFactory;
 /**
  * Contains selector thread which transitions method call objects
  */
+@SuppressWarnings("unchecked")
 public class TAsyncClientManager {
   private static final Logger LOGGER = LoggerFactory.getLogger(TAsyncClientManager.class.getName());
 
@@ -57,8 +59,12 @@ public class TAsyncClientManager {
   }
 
   private class SelectThread extends Thread {
+    // Selector waits at most SELECT_TIME milliseconds before waking
+    private static final long SELECT_TIME = 200;
+
     private final Selector selector;
     private volatile boolean running;
+    private final Set<TAsyncMethodCall> timeoutWatchSet = new HashSet<TAsyncMethodCall>();
 
     public SelectThread() throws IOException {
       this.selector = SelectorProvider.provider().openSelector();
@@ -79,46 +85,76 @@ public class TAsyncClientManager {
     public void run() {
       while (running) {
         try {
-          selector.select();
+          selector.select(SELECT_TIME);
         } catch (IOException e) {
           LOGGER.error("Caught IOException in TAsyncClientManager!", e);
         }
 
-        // Handle any ready channels calls
-        try {
-          Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
-          while (keys.hasNext()) {
-            SelectionKey key = keys.next();
-            keys.remove();
-            if (!key.isValid()) {
-              // this should only have happened if the method call experienced an
-              // error and the key was cancelled. just skip it.
-              continue;
-            }
-            TAsyncMethodCall method = (TAsyncMethodCall)key.attachment();
-            method.transition(key);
+        transitionMethods();
+        timeoutIdleMethods();
+        startPendingMethods();
+      }
+    }
+
+    // Transition methods for ready keys
+    private void transitionMethods() {
+      try {
+        Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
+        while (keys.hasNext()) {
+          SelectionKey key = keys.next();
+          keys.remove();
+          if (!key.isValid()) {
+            // this can happen if the method call experienced an error and the key was cancelled
+            // this can also happen if we timeout a method, which results in a channel close
+            // just skip
+            continue;
+          }
+          TAsyncMethodCall methodCall = (TAsyncMethodCall)key.attachment();
+          methodCall.transition(key);
+
+          // If done or error occurred, remove from timeout watch set
+          if (methodCall.isFinished() || methodCall.getClient().hasError()) {
+            timeoutWatchSet.remove(methodCall);
           }
-        } catch (ClosedSelectorException e) {
-          LOGGER.error("Caught ClosedSelectorException in TAsyncClientManager!", e);
         }
+      } catch (ClosedSelectorException e) {
+        LOGGER.error("Caught ClosedSelectorException in TAsyncClientManager!", e);
+      }
+    }
 
-        // Start any new calls
-        TAsyncMethodCall methodCall;
-        while ((methodCall = pendingCalls.poll()) != null) {
-          // Catch registration errors. Method will catch transition errors and cleanup.
-          try {
-            SelectionKey key = methodCall.registerWithSelector(selector);
-            methodCall.transition(key);
-          } catch (ClosedChannelException e) {
-            methodCall.onError(e);
-            LOGGER.warn("Caught ClosedChannelException in TAsyncClientManager!", e);
-          } catch (CancelledKeyException e) {
-            methodCall.onError(e);
-            LOGGER.warn("Caught CancelledKeyExce115ption in TAsyncClientManager!", e);
-          } catch (Exception e) {
-            methodCall.onError(e);
-            LOGGER.warn("Caught unexpected exception in TAsyncClientManager!", e);
-          }          
+    // Timeout any existing method calls
+    private void timeoutIdleMethods() {
+      Iterator<TAsyncMethodCall> iterator = timeoutWatchSet.iterator();
+      while (iterator.hasNext()) {
+        TAsyncMethodCall methodCall = iterator.next();
+        long clientTimeout = methodCall.getClient().getTimeout();
+        long timeElapsed = System.currentTimeMillis() - methodCall.getLastTransitionTime();
+        if (timeElapsed > clientTimeout) {
+          iterator.remove();
+          methodCall.onError(new TimeoutException("Operation " + 
+              methodCall.getClass() + " timed out after " + timeElapsed + 
+              " milliseconds."));
+        }
+      }
+    }
+
+    // Start any new calls
+    private void startPendingMethods() {
+      TAsyncMethodCall methodCall;
+      while ((methodCall = pendingCalls.poll()) != null) {
+        // Catch registration errors. method will catch transition errors and cleanup.
+        try {
+          SelectionKey key = methodCall.registerWithSelector(selector);
+          methodCall.transition(key);
+
+          // If timeout specified and first transition went smoothly, add to timeout watch set
+          TAsyncClient client = methodCall.getClient();
+          if (client.hasTimeout() && !client.hasError()) {
+            timeoutWatchSet.add(methodCall);
+          }
+        } catch (Throwable e) {
+          LOGGER.warn("Caught throwable in TAsyncClientManager!", e);
+          methodCall.onError(e);
         }
       }
     }

Modified: incubator/thrift/trunk/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java?rev=987323&r1=987322&r2=987323&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java (original)
+++ incubator/thrift/trunk/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java Thu Aug 19 21:41:58 2010
@@ -55,6 +55,7 @@ public abstract class TAsyncMethodCall<T
   protected final TAsyncClient client;
   private final AsyncMethodCallback<T> callback;
   private final boolean isOneway;
+  private long lastTransitionTime;
 
   private ByteBuffer sizeBuffer;
   private final byte[] sizeBufferArray = new byte[4];
@@ -76,6 +77,18 @@ public abstract class TAsyncMethodCall<T
     return state;
   }
 
+  protected boolean isFinished() {
+    return state == State.RESPONSE_READ;
+  }
+
+  protected long getLastTransitionTime() {
+    return lastTransitionTime;
+  }
+
+  public TAsyncClient getClient() {
+    return client;
+  }
+
   protected abstract void write_args(TProtocol protocol) throws TException;
 
   protected void prepareMethodCall() throws TException {
@@ -135,13 +148,14 @@ public abstract class TAsyncMethodCall<T
           throw new IllegalStateException("Method call in state " + state 
               + " but selector called transition method. Seems like a bug...");
       }
+      lastTransitionTime = System.currentTimeMillis();
     } catch (Throwable e) {
       key.cancel();
       key.attach(null);
       onError(e);
     }
   }
-  
+
   protected void onError(Throwable e) {
     state = State.ERROR;
     client.onError(e);

Modified: incubator/thrift/trunk/lib/java/test/org/apache/thrift/async/TestTAsyncClientManager.java
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/java/test/org/apache/thrift/async/TestTAsyncClientManager.java?rev=987323&r1=987322&r2=987323&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/java/test/org/apache/thrift/async/TestTAsyncClientManager.java (original)
+++ incubator/thrift/trunk/lib/java/test/org/apache/thrift/async/TestTAsyncClientManager.java Thu Aug 19 21:41:58 2010
@@ -18,6 +18,9 @@
  */
 package org.apache.thrift.async;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import junit.framework.TestCase;
@@ -28,14 +31,12 @@ import org.apache.thrift.server.TNonbloc
 import org.apache.thrift.transport.TNonblockingServerSocket;
 import org.apache.thrift.transport.TNonblockingSocket;
 
-import java.util.List;
-import java.util.ArrayList;
-
 import thrift.test.CompactProtoTestStruct;
 import thrift.test.Srv;
 import thrift.test.Srv.Iface;
 import thrift.test.Srv.AsyncClient.Janky_call;
 import thrift.test.Srv.AsyncClient.onewayMethod_call;
+import thrift.test.Srv.AsyncClient.primitiveMethod_call;
 import thrift.test.Srv.AsyncClient.voidMethod_call;
 
 public class TestTAsyncClientManager extends TestCase {
@@ -58,8 +59,15 @@ public class TestTAsyncClientManager ext
     public void methodWithDefaultArgs(int something) throws TException {
     }
 
+    // Using this method for timeout testing
     @Override
     public int primitiveMethod() throws TException {
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException e) {
+        // TODO Auto-generated catch block
+        e.printStackTrace();
+      }
       return 0;
     }
 
@@ -76,31 +84,31 @@ public class TestTAsyncClientManager ext
     public void onewayMethod() throws TException {
     }
   }
-  
+
   public class JankyRunnable implements Runnable {
     private TAsyncClientManager acm_;
     private int numCalls_;
     private int numSuccesses_ = 0;
     private Srv.AsyncClient client_;
     private TNonblockingSocket clientSocket_;
-    
+
     public JankyRunnable(TAsyncClientManager acm, int numCalls) throws Exception {
       this.acm_ = acm;
       this.numCalls_ = numCalls;
       this.clientSocket_ = new TNonblockingSocket("localhost", 12345);
       this.client_ = new Srv.AsyncClient(new TBinaryProtocol.Factory(), acm_, clientSocket_);
     }
-    
+
     public int getNumSuccesses() {
       return numSuccesses_;
     }
-    
+
     public void run() {
       for (int i = 0; i < numCalls_; i++) {
-        try {          
+        try {
           // connect an async client
           final Object o = new Object();
-          
+
           final AtomicBoolean jankyReturned = new AtomicBoolean(false);
           client_.Janky(1, new AsyncMethodCallback<Srv.AsyncClient.Janky_call>() {
             @Override
@@ -112,28 +120,28 @@ public class TestTAsyncClientManager ext
                   o.notifyAll();
                 }
               } catch (TException e) {
-                e.printStackTrace();               
+                e.printStackTrace();
                 synchronized(o) {
                   o.notifyAll();
                 }
                 fail("unexpected exception: " + e);
-              } 
-              
+              }
             }
-            
+
             @Override
             public void onError(Throwable throwable) {
+              System.out.println(throwable.toString());
               synchronized(o) {
                 o.notifyAll();
               }
-              fail("unexpected exception: " + throwable);             
+              fail("unexpected exception: " + throwable);
             }
           });
-      
+
           synchronized(o) {
             o.wait(1000);
           }
-          
+
           assertTrue(jankyReturned.get());
           this.numSuccesses_++;
         } catch (Exception e) {
@@ -143,6 +151,30 @@ public class TestTAsyncClientManager ext
     }
   }
 
+  public void standardCallTest(Srv.AsyncClient client) throws Exception {
+    final Object o = new Object();
+    final AtomicBoolean jankyReturned = new AtomicBoolean(false);
+    client.Janky(1, new FailureLessCallback<Srv.AsyncClient.Janky_call>() {
+      @Override
+      public void onComplete(Janky_call response) {
+        try {
+          assertEquals(3, response.getResult());
+          jankyReturned.set(true);
+        } catch (TException e) {
+          fail("unexpected exception: " + e);
+        }
+        synchronized(o) {
+          o.notifyAll();
+        }
+      }
+    });
+
+    synchronized(o) {
+      o.wait(100000);
+    }
+    assertTrue(jankyReturned.get());
+  }
+
   public void testIt() throws Exception {
     // put up a server
     final TNonblockingServer s = new TNonblockingServer(new Srv.Processor(new SrvHandler()), new TNonblockingServerSocket(12345));
@@ -164,26 +196,11 @@ public class TestTAsyncClientManager ext
     final Object o = new Object();
 
     // make a standard method call
-    final AtomicBoolean jankyReturned = new AtomicBoolean(false);
-    client.Janky(1, new FailureLessCallback<Srv.AsyncClient.Janky_call>() {
-      @Override
-      public void onComplete(Janky_call response) {
-        try {
-          assertEquals(3, response.getResult());
-          jankyReturned.set(true);
-        } catch (TException e) {
-          fail("unexpected exception: " + e);
-        }
-        synchronized(o) {
-          o.notifyAll();
-        }
-      }
-    });
+    standardCallTest(client);
 
-    synchronized(o) {
-      o.wait(100000);
-    }
-    assertTrue(jankyReturned.get());
+    // make a standard method call that succeeds within timeout
+    client.setTimeout(5000);
+    standardCallTest(client);
 
     // make a void method call
     final AtomicBoolean voidMethodReturned = new AtomicBoolean(false);
@@ -249,7 +266,7 @@ public class TestTAsyncClientManager ext
       o.wait(1000);
     }
     assertTrue(voidAfterOnewayReturned.get());
-    
+
     // make multiple calls with deserialization in the selector thread (repro Eric's issue)
     int numThreads = 500;
     int numCallsPerThread = 100;
@@ -270,5 +287,34 @@ public class TestTAsyncClientManager ext
       numSuccesses += runnable.getNumSuccesses();
     }
     assertEquals(numSuccesses, numThreads * numCallsPerThread);
+
+    // check that timeouts work
+    client.setTimeout(100);
+    client.primitiveMethod(new AsyncMethodCallback<primitiveMethod_call>() {
+
+      @Override
+      public void onError(Throwable throwable) {
+        if (!(throwable instanceof TimeoutException)) {
+          fail("should have received timeout exception");
+          synchronized(o) {
+            o.notifyAll();
+          }
+        }
+      }
+
+      @Override
+      public void onComplete(primitiveMethod_call response) {
+        fail("should not have finished timed out call.");
+        synchronized(o) {
+          o.notifyAll();
+        }
+      }
+
+    });
+    synchronized(o) {
+      o.wait(2000);
+    }
+    assertTrue(client.hasError());
+    assertTrue(client.getError() instanceof TimeoutException);
   }
 }