You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@thrift.apache.org by br...@apache.org on 2010/08/05 23:13:06 UTC

svn commit: r982789 - in /incubator/thrift/trunk/lib/java: src/org/apache/thrift/async/TAsyncClientManager.java src/org/apache/thrift/async/TAsyncMethodCall.java test/org/apache/thrift/async/TestTAsyncClientManager.java

Author: bryanduxbury
Date: Thu Aug  5 21:13:06 2010
New Revision: 982789

URL: http://svn.apache.org/viewvc?rev=982789&view=rev
Log:
THRIFT-836. java: Race condition causes CancelledKeyException in TAsyncClientManager

This patch removes an erroneous key cancellation and adds exception handling for a bunch of different exceptions.
Patch: Ning Liang

Modified:
    incubator/thrift/trunk/lib/java/src/org/apache/thrift/async/TAsyncClientManager.java
    incubator/thrift/trunk/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java
    incubator/thrift/trunk/lib/java/test/org/apache/thrift/async/TestTAsyncClientManager.java

Modified: incubator/thrift/trunk/lib/java/src/org/apache/thrift/async/TAsyncClientManager.java
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/java/src/org/apache/thrift/async/TAsyncClientManager.java?rev=982789&r1=982788&r2=982789&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/java/src/org/apache/thrift/async/TAsyncClientManager.java (original)
+++ incubator/thrift/trunk/lib/java/src/org/apache/thrift/async/TAsyncClientManager.java Thu Aug  5 21:13:06 2010
@@ -22,6 +22,9 @@ import java.io.IOException;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
 import java.nio.channels.spi.SelectorProvider;
+import java.nio.channels.CancelledKeyException;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.ClosedSelectorException;
 import java.util.Iterator;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
@@ -82,28 +85,40 @@ public class TAsyncClientManager {
         }
 
         // Handle any ready channels calls
-        Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
-        while (keys.hasNext()) {
-          SelectionKey key = keys.next();
-          keys.remove();
-          if (!key.isValid()) {
-            // this should only have happened if the method call experienced an
-            // error and the key was cancelled. just skip it.
-            continue;
+        try {
+          Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
+          while (keys.hasNext()) {
+            SelectionKey key = keys.next();
+            keys.remove();
+            if (!key.isValid()) {
+              // this should only have happened if the method call experienced an
+              // error and the key was cancelled. just skip it.
+              continue;
+            }
+            TAsyncMethodCall method = (TAsyncMethodCall)key.attachment();
+            method.transition(key);
           }
-          TAsyncMethodCall method = (TAsyncMethodCall)key.attachment();
-          method.transition(key);
+        } catch (ClosedSelectorException e) {
+          LOGGER.error("Caught ClosedSelectorException in TAsyncClientManager!", e);
         }
 
         // Start any new calls
         TAsyncMethodCall methodCall;
         while ((methodCall = pendingCalls.poll()) != null) {
+          // Catch registration errors. Method will catch transition errors and cleanup.
           try {
             SelectionKey key = methodCall.registerWithSelector(selector);
             methodCall.transition(key);
-          } catch (IOException e) {
-            LOGGER.warn("Caught IOException in TAsyncClientManager!", e);
-          }
+          } catch (ClosedChannelException e) {
+            methodCall.onError(e);
+            LOGGER.warn("Caught ClosedChannelException in TAsyncClientManager!", e);
+          } catch (CancelledKeyException e) {
+            methodCall.onError(e);
+            LOGGER.warn("Caught CancelledKeyExce115ption in TAsyncClientManager!", e);
+          } catch (Exception e) {
+            methodCall.onError(e);
+            LOGGER.warn("Caught unexpected exception in TAsyncClientManager!", e);
+          }          
         }
       }
     }

Modified: incubator/thrift/trunk/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java?rev=982789&r1=982788&r2=982789&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java (original)
+++ incubator/thrift/trunk/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java Thu Aug  5 21:13:06 2010
@@ -111,8 +111,7 @@ public abstract class TAsyncMethodCall<T
     if (!key.isValid()) {
       key.cancel();
       Exception e = new TTransportException("Selection key not valid!");
-      client.onError(e);
-      callback.onError(e);
+      onError(e);
       return;
     }
 
@@ -137,13 +136,17 @@ public abstract class TAsyncMethodCall<T
               + " but selector called transition method. Seems like a bug...");
       }
     } catch (Throwable e) {
-      state = State.ERROR;
       key.cancel();
       key.attach(null);
-      client.onError(e);
-      callback.onError(e);
+      onError(e);
     }
   }
+  
+  protected void onError(Throwable e) {
+    state = State.ERROR;
+    client.onError(e);
+    callback.onError(e);
+  }
 
   private void doReadingResponseBody(SelectionKey key) throws IOException {
     if (transport.read(frameBuffer) < 0) {
@@ -159,7 +162,6 @@ public abstract class TAsyncMethodCall<T
     key.interestOps(0);
     // this ensures that the TAsyncMethod instance doesn't hang around
     key.attach(null);
-    key.cancel();
     client.onComplete();
     callback.onComplete((T)this);
   }

Modified: incubator/thrift/trunk/lib/java/test/org/apache/thrift/async/TestTAsyncClientManager.java
URL: http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/java/test/org/apache/thrift/async/TestTAsyncClientManager.java?rev=982789&r1=982788&r2=982789&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/java/test/org/apache/thrift/async/TestTAsyncClientManager.java (original)
+++ incubator/thrift/trunk/lib/java/test/org/apache/thrift/async/TestTAsyncClientManager.java Thu Aug  5 21:13:06 2010
@@ -28,6 +28,9 @@ import org.apache.thrift.server.TNonbloc
 import org.apache.thrift.transport.TNonblockingServerSocket;
 import org.apache.thrift.transport.TNonblockingSocket;
 
+import java.util.List;
+import java.util.ArrayList;
+
 import thrift.test.CompactProtoTestStruct;
 import thrift.test.Srv;
 import thrift.test.Srv.Iface;
@@ -73,6 +76,72 @@ public class TestTAsyncClientManager ext
     public void onewayMethod() throws TException {
     }
   }
+  
+  public class JankyRunnable implements Runnable {
+    private TAsyncClientManager acm_;
+    private int numCalls_;
+    private int numSuccesses_ = 0;
+    private Srv.AsyncClient client_;
+    private TNonblockingSocket clientSocket_;
+    
+    public JankyRunnable(TAsyncClientManager acm, int numCalls) throws Exception {
+      this.acm_ = acm;
+      this.numCalls_ = numCalls;
+      this.clientSocket_ = new TNonblockingSocket("localhost", 12345);
+      this.client_ = new Srv.AsyncClient(new TBinaryProtocol.Factory(), acm_, clientSocket_);
+    }
+    
+    public int getNumSuccesses() {
+      return numSuccesses_;
+    }
+    
+    public void run() {
+      for (int i = 0; i < numCalls_; i++) {
+        try {          
+          // connect an async client
+          final Object o = new Object();
+          
+          final AtomicBoolean jankyReturned = new AtomicBoolean(false);
+          client_.Janky(1, new AsyncMethodCallback<Srv.AsyncClient.Janky_call>() {
+            @Override
+            public void onComplete(Janky_call response) {
+              try {
+                assertEquals(3, response.getResult());
+                jankyReturned.set(true);
+                synchronized(o) {
+                  o.notifyAll();
+                }
+              } catch (TException e) {
+                e.printStackTrace();               
+                synchronized(o) {
+                  o.notifyAll();
+                }
+                fail("unexpected exception: " + e);
+              } 
+              
+            }
+            
+            @Override
+            public void onError(Throwable throwable) {
+              synchronized(o) {
+                o.notifyAll();
+              }
+              fail("unexpected exception: " + throwable);             
+            }
+          });
+      
+          synchronized(o) {
+            o.wait(1000);
+          }
+          
+          assertTrue(jankyReturned.get());
+          this.numSuccesses_++;
+        } catch (Exception e) {
+          fail("Unexpected " + e);
+        }
+      }
+    }
+  }
 
   public void testIt() throws Exception {
     // put up a server
@@ -179,7 +248,27 @@ public class TestTAsyncClientManager ext
     synchronized(o) {
       o.wait(1000);
     }
-
     assertTrue(voidAfterOnewayReturned.get());
+    
+    // make multiple calls with deserialization in the selector thread (repro Eric's issue)
+    int numThreads = 500;
+    int numCallsPerThread = 100;
+    List<JankyRunnable> runnables = new ArrayList<JankyRunnable>();
+    List<Thread> threads = new ArrayList<Thread>();
+    for (int i = 0; i < numThreads; i++) {
+      JankyRunnable runnable = new JankyRunnable(acm, numCallsPerThread);
+      Thread thread = new Thread(runnable);
+      thread.start();
+      threads.add(thread);
+      runnables.add(runnable);
+    }
+    for (Thread thread : threads) {
+      thread.join();
+    }
+    int numSuccesses = 0;
+    for (JankyRunnable runnable : runnables) {
+      numSuccesses += runnable.getNumSuccesses();
+    }
+    assertEquals(numSuccesses, numThreads * numCallsPerThread);
   }
 }