You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by jb...@apache.org on 2012/02/08 04:24:31 UTC

svn commit: r1241759 - in /avro/trunk: CHANGES.txt lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerConcurrentExecution.java

Author: jbaldassari
Date: Wed Feb  8 03:24:31 2012
New Revision: 1241759

URL: http://svn.apache.org/viewvc?rev=1241759&view=rev
Log:
AVRO-1019. Java: Add unit test for Netty server concurrent execution.


Added:
    avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerConcurrentExecution.java
Modified:
    avro/trunk/CHANGES.txt

Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1241759&r1=1241758&r2=1241759&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Wed Feb  8 03:24:31 2012
@@ -58,6 +58,8 @@ Avro 1.6.2 (unreleased)
 
     AVRO-1018. Java: add svn:ignore to eclipse generated files for protobuf, thrift, and archetype modules (scottcarey)
 
+    AVRO-1019. Java: Add unit test for Netty server concurrent execution. (jbaldassari)
+
   BUG FIXES
 
     AVRO-962. Java: Fix Maven plugin to support string type override.

Added: avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerConcurrentExecution.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerConcurrentExecution.java?rev=1241759&view=auto
==============================================================================
--- avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerConcurrentExecution.java (added)
+++ avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerConcurrentExecution.java Wed Feb  8 03:24:31 2012
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.ipc;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+
+import org.apache.avro.AvroRemoteException;
+import org.apache.avro.ipc.specific.SpecificRequestor;
+import org.apache.avro.ipc.specific.SpecificResponder;
+import org.apache.avro.test.Simple;
+import org.apache.avro.test.TestError;
+import org.apache.avro.test.TestRecord;
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+import org.jboss.netty.handler.execution.ExecutionHandler;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Verifies that RPCs executed by different client threads using the same 
+ * NettyTransceiver will execute concurrently.  The test follows these steps:
+ * 1. Execute the {@link #org.apache.avro.test.Simple.add(int, int)} RPC to 
+ *    complete the Avro IPC handshake.
+ * 2a. In a background thread, wait for the waitLatch.
+ * 3a. In the main thread, invoke 
+ *    {@link #org.apache.avro.test.Simple.hello(String)} with the argument 
+ *    "wait".  This causes the ClientImpl running on the server to count down 
+ *    the wait latch, which will unblock the background thread and allow it to 
+ *    proceed.  After counting down the latch, this call blocks, waiting for 
+ *    {@link #org.apache.avro.test.Simple.ack()} to be invoked.
+ * 2b. The background thread wakes up because the waitLatch has been counted 
+ *     down.  Now we know that some thread is executing inside hello(String).
+ *     Next, execute {@link #org.apache.avro.test.Simple.ack()} in the 
+ *     background thread, which will allow the thread executing hello(String) 
+ *     to return.
+ * 3b. The thread executing hello(String) on the server unblocks (since ack() 
+ *     has been called), allowing hello(String) to return.
+ * 4. If control returns to the main thread, we know that two RPCs 
+ *    (hello(String) and ack()) were executing concurrently.
+ */
+public class TestNettyServerConcurrentExecution {
+  private Server server;
+  private Transceiver transceiver;
+  
+  @After
+  public void cleanUpAfter() throws Exception {
+    try {
+      if (transceiver != null) {
+        transceiver.close();
+      }
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+    try {
+      if (server != null) {
+        server.close();
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
+  
+  @Test(timeout=30000)
+  public void test() throws Exception {
+    final CountDownLatch waitLatch = new CountDownLatch(1);
+    server = new NettyServer(
+        new SpecificResponder(Simple.class, new SimpleImpl(waitLatch)), 
+        new InetSocketAddress(0), 
+        new NioServerSocketChannelFactory
+          (Executors.newCachedThreadPool(), Executors.newCachedThreadPool()), 
+        new ExecutionHandler(Executors.newCachedThreadPool()));
+    server.start();
+    
+    transceiver = new NettyTransceiver(new InetSocketAddress(
+        server.getPort()), TestNettyServer.CONNECT_TIMEOUT_MILLIS);
+    final Simple.Callback simpleClient = 
+        SpecificRequestor.getClient(Simple.Callback.class, transceiver);
+    
+    // 1. Execute the Client.add(int, int) RPC to establish the handshake:
+    Assert.assertEquals(3, simpleClient.add(1, 2));
+    
+    /*
+     * 2a. In a background thread, wait for the Client.hello("wait") call to be
+     *    received by the server, then:
+     * 2b. Execute the Client.ack() RPC, which will unblock the 
+     *     Client.hello("wait") call, allowing it to return to the main thread.
+     */
+    new Thread() {
+      @Override
+      public void run() {
+        setName(TestNettyServerConcurrentExecution.class.getSimpleName() + 
+            "Ack Thread");
+        try {
+          // Step 2a:
+          waitLatch.await();
+          
+          // Step 2b:
+          simpleClient.ack();
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
+      }
+    }.start();
+    
+    /*
+     * 3. Execute the Client.hello("wait") RPC, which will block until the
+     *    Client.ack() call has completed in the background thread.
+     */
+    String response = simpleClient.hello("wait");
+    
+    // 4. If control reaches here, both RPCs have executed concurrently
+    Assert.assertEquals("wait", response); 
+  }
+
+  /**
+   * Implementation of the Simple interface for use with this unit test.
+   * If {@link #hello(String)} is called with "wait" as its argument,  
+   * {@link #waitLatch} will be counted down, and {@link #hello(String)} will 
+   * block until {@link #ack()} has been invoked.
+   */
+  private static class SimpleImpl implements Simple {
+    private final CountDownLatch waitLatch;
+    private final CountDownLatch ackLatch = new CountDownLatch(1);
+    
+    /**
+     * Creates a SimpleImpl that uses the given CountDownLatch.
+     * @param waitLatch the CountDownLatch to use in {@link #hello(String)}.
+     */
+    public SimpleImpl(final CountDownLatch waitLatch) {
+      this.waitLatch = waitLatch;
+    }
+    
+    @Override
+    public int add(int arg1, int arg2) throws AvroRemoteException {
+      // Step 1:
+      return arg1 + arg2;
+    }
+    
+    @Override
+    public String hello(String greeting) throws AvroRemoteException {
+      if (greeting.equals("wait")) {
+        try {
+          // Step 3a:
+          waitLatch.countDown();
+          
+          // Step 3b:
+          ackLatch.await();
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          return e.toString();
+        }
+      }
+      return greeting;
+    }
+    
+    @Override
+    public void ack() {
+      // Step 2b:
+      ackLatch.countDown();
+    }
+    
+    // All RPCs below this line are irrelevant to this test:
+    
+    @Override
+    public TestRecord echo(TestRecord record) throws AvroRemoteException {
+      return record;
+    }
+
+    @Override
+    public ByteBuffer echoBytes(ByteBuffer data) throws AvroRemoteException {
+      return data;
+    }
+
+    @Override
+    public Void error() throws AvroRemoteException, TestError {
+      throw new TestError("TestError");
+    }
+  }
+}