You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by jb...@apache.org on 2012/02/08 04:24:31 UTC
svn commit: r1241759 - in /avro/trunk: CHANGES.txt
lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerConcurrentExecution.java
Author: jbaldassari
Date: Wed Feb 8 03:24:31 2012
New Revision: 1241759
URL: http://svn.apache.org/viewvc?rev=1241759&view=rev
Log:
AVRO-1019. Java: Add unit test for Netty server concurrent execution.
Added:
avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerConcurrentExecution.java
Modified:
avro/trunk/CHANGES.txt
Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1241759&r1=1241758&r2=1241759&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Wed Feb 8 03:24:31 2012
@@ -58,6 +58,8 @@ Avro 1.6.2 (unreleased)
AVRO-1018. Java: add svn:ignore to eclipse generated files for protobuf, thrift, and archetype modules (scottcarey)
+ AVRO-1019. Java: Add unit test for Netty server concurrent execution. (jbaldassari)
+
BUG FIXES
AVRO-962. Java: Fix Maven plugin to support string type override.
Added: avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerConcurrentExecution.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerConcurrentExecution.java?rev=1241759&view=auto
==============================================================================
--- avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerConcurrentExecution.java (added)
+++ avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerConcurrentExecution.java Wed Feb 8 03:24:31 2012
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.ipc;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+
+import org.apache.avro.AvroRemoteException;
+import org.apache.avro.ipc.specific.SpecificRequestor;
+import org.apache.avro.ipc.specific.SpecificResponder;
+import org.apache.avro.test.Simple;
+import org.apache.avro.test.TestError;
+import org.apache.avro.test.TestRecord;
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+import org.jboss.netty.handler.execution.ExecutionHandler;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Verifies that RPCs executed by different client threads using the same
+ * NettyTransceiver will execute concurrently. The test follows these steps:
+ * 1. Execute the {@link #org.apache.avro.test.Simple.add(int, int)} RPC to
+ * complete the Avro IPC handshake.
+ * 2a. In a background thread, wait for the waitLatch.
+ * 3a. In the main thread, invoke
+ * {@link #org.apache.avro.test.Simple.hello(String)} with the argument
+ * "wait". This causes the ClientImpl running on the server to count down
+ * the wait latch, which will unblock the background thread and allow it to
+ * proceed. After counting down the latch, this call blocks, waiting for
+ * {@link #org.apache.avro.test.Simple.ack()} to be invoked.
+ * 2b. The background thread wakes up because the waitLatch has been counted
+ * down. Now we know that some thread is executing inside hello(String).
+ * Next, execute {@link #org.apache.avro.test.Simple.ack()} in the
+ * background thread, which will allow the thread executing hello(String)
+ * to return.
+ * 3b. The thread executing hello(String) on the server unblocks (since ack()
+ * has been called), allowing hello(String) to return.
+ * 4. If control returns to the main thread, we know that two RPCs
+ * (hello(String) and ack()) were executing concurrently.
+ */
+public class TestNettyServerConcurrentExecution {
+ private Server server;
+ private Transceiver transceiver;
+
+ @After
+ public void cleanUpAfter() throws Exception {
+ try {
+ if (transceiver != null) {
+ transceiver.close();
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ try {
+ if (server != null) {
+ server.close();
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Test(timeout=30000)
+ public void test() throws Exception {
+ final CountDownLatch waitLatch = new CountDownLatch(1);
+ server = new NettyServer(
+ new SpecificResponder(Simple.class, new SimpleImpl(waitLatch)),
+ new InetSocketAddress(0),
+ new NioServerSocketChannelFactory
+ (Executors.newCachedThreadPool(), Executors.newCachedThreadPool()),
+ new ExecutionHandler(Executors.newCachedThreadPool()));
+ server.start();
+
+ transceiver = new NettyTransceiver(new InetSocketAddress(
+ server.getPort()), TestNettyServer.CONNECT_TIMEOUT_MILLIS);
+ final Simple.Callback simpleClient =
+ SpecificRequestor.getClient(Simple.Callback.class, transceiver);
+
+ // 1. Execute the Client.add(int, int) RPC to establish the handshake:
+ Assert.assertEquals(3, simpleClient.add(1, 2));
+
+ /*
+ * 2a. In a background thread, wait for the Client.hello("wait") call to be
+ * received by the server, then:
+ * 2b. Execute the Client.ack() RPC, which will unblock the
+ * Client.hello("wait") call, allowing it to return to the main thread.
+ */
+ new Thread() {
+ @Override
+ public void run() {
+ setName(TestNettyServerConcurrentExecution.class.getSimpleName() +
+ "Ack Thread");
+ try {
+ // Step 2a:
+ waitLatch.await();
+
+ // Step 2b:
+ simpleClient.ack();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }.start();
+
+ /*
+ * 3. Execute the Client.hello("wait") RPC, which will block until the
+ * Client.ack() call has completed in the background thread.
+ */
+ String response = simpleClient.hello("wait");
+
+ // 4. If control reaches here, both RPCs have executed concurrently
+ Assert.assertEquals("wait", response);
+ }
+
+ /**
+ * Implementation of the Simple interface for use with this unit test.
+ * If {@link #hello(String)} is called with "wait" as its argument,
+ * {@link #waitLatch} will be counted down, and {@link #hello(String)} will
+ * block until {@link #ack()} has been invoked.
+ */
+ private static class SimpleImpl implements Simple {
+ private final CountDownLatch waitLatch;
+ private final CountDownLatch ackLatch = new CountDownLatch(1);
+
+ /**
+ * Creates a SimpleImpl that uses the given CountDownLatch.
+ * @param waitLatch the CountDownLatch to use in {@link #hello(String)}.
+ */
+ public SimpleImpl(final CountDownLatch waitLatch) {
+ this.waitLatch = waitLatch;
+ }
+
+ @Override
+ public int add(int arg1, int arg2) throws AvroRemoteException {
+ // Step 1:
+ return arg1 + arg2;
+ }
+
+ @Override
+ public String hello(String greeting) throws AvroRemoteException {
+ if (greeting.equals("wait")) {
+ try {
+ // Step 3a:
+ waitLatch.countDown();
+
+ // Step 3b:
+ ackLatch.await();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return e.toString();
+ }
+ }
+ return greeting;
+ }
+
+ @Override
+ public void ack() {
+ // Step 2b:
+ ackLatch.countDown();
+ }
+
+ // All RPCs below this line are irrelevant to this test:
+
+ @Override
+ public TestRecord echo(TestRecord record) throws AvroRemoteException {
+ return record;
+ }
+
+ @Override
+ public ByteBuffer echoBytes(ByteBuffer data) throws AvroRemoteException {
+ return data;
+ }
+
+ @Override
+ public Void error() throws AvroRemoteException, TestError {
+ throw new TestError("TestError");
+ }
+ }
+}