You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by cu...@apache.org on 2011/06/17 19:55:22 UTC
svn commit: r1136961 - in /avro/branches/branch-1.5: ./
lang/java/compiler/src/main/velocity/org/apache/avro/compiler/specific/templates/java/classic/
lang/java/ipc/src/main/java/org/apache/avro/ipc/
lang/java/ipc/src/main/java/org/apache/avro/ipc/gene...
Author: cutting
Date: Fri Jun 17 17:55:21 2011
New Revision: 1136961
URL: http://svn.apache.org/viewvc?rev=1136961&view=rev
Log:
Merge r1099257 and r1136342 from trunk. Fixes: AVRO-815 and AVRO-539.
Added:
avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/CallFuture.java
- copied unchanged from r1136342, avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/CallFuture.java
avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/Callback.java
- copied unchanged from r1136342, avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/Callback.java
avro/branches/branch-1.5/lang/java/ipc/src/test/java/org/apache/avro/TestProtocolNetty.java
- copied unchanged from r1136342, avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/TestProtocolNetty.java
avro/branches/branch-1.5/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithCallbacks.java
- copied unchanged from r1136342, avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithCallbacks.java
Modified:
avro/branches/branch-1.5/ (props changed)
avro/branches/branch-1.5/CHANGES.txt
avro/branches/branch-1.5/lang/java/compiler/src/main/velocity/org/apache/avro/compiler/specific/templates/java/classic/protocol.vm
avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyServer.java
avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java
avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/Requestor.java
avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/Responder.java
avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/Transceiver.java
avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/generic/GenericRequestor.java
avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/specific/SpecificRequestor.java
avro/branches/branch-1.5/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServer.java
avro/branches/branch-1.5/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestRpcPluginOrdering.java
avro/branches/branch-1.5/share/test/schemas/mail.avpr
Propchange: avro/branches/branch-1.5/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Jun 17 17:55:21 2011
@@ -1 +1 @@
-/avro/trunk:1075938,1075993,1078917,1079055,1079060,1079063,1083246,1085921,1086727,1086730,1086866,1087076,1087129,1087136,1087439-1087440,1087463,1087472,1087792,1089128,1089131,1089550,1094812,1095206-1095208,1095493,1095529,1095548,1095550,1096798,1097916,1097927,1097968,1097974,1102332,1102335,1124127,1124971,1129053,1129071,1129697-1129706,1129729,1130503
+/avro/trunk:1075938,1075993,1078917,1079055,1079060,1079063,1083246,1085921,1086727,1086730,1086866,1087076,1087129,1087136,1087439-1087440,1087463,1087472,1087792,1089128,1089131,1089550,1094812,1095206-1095208,1095493,1095529,1095548,1095550,1096798,1097916,1097927,1097968,1097974,1099257,1102332,1102335,1124127,1124971,1129053,1129071,1129697-1129706,1129729,1130503,1136342
Modified: avro/branches/branch-1.5/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/branches/branch-1.5/CHANGES.txt?rev=1136961&r1=1136960&r2=1136961&view=diff
==============================================================================
--- avro/branches/branch-1.5/CHANGES.txt (original)
+++ avro/branches/branch-1.5/CHANGES.txt Fri Jun 17 17:55:21 2011
@@ -8,6 +8,9 @@ Avro 1.5.2 (unreleased)
streaming jobs to easily write Avro format output with "bytes" as
schema. (Tom White via cutting)
+ AVRO-539. Java: Add asynchronous RPC support, through either
+ callbacks or futures. (James Baldassari via cutting)
+
IMPROVEMENTS
AVRO-469. C: Set library's libtool-style soversion when using CMake
@@ -34,6 +37,10 @@ Avro 1.5.2 (unreleased)
AVRO-832. Java: Fix RPC client to correctly perform schema
resolution on message responses. (cutting)
+ AVRO-815. Java: Netty Transceiver fails processing one-way messages.
+ Implemented writeBuffers for the NettyTransceiver to allow it to
+ process one-way messages.
+
Avro 1.5.1 (3 May 2011)
NEW FEATURES
Modified: avro/branches/branch-1.5/lang/java/compiler/src/main/velocity/org/apache/avro/compiler/specific/templates/java/classic/protocol.vm
URL: http://svn.apache.org/viewvc/avro/branches/branch-1.5/lang/java/compiler/src/main/velocity/org/apache/avro/compiler/specific/templates/java/classic/protocol.vm?rev=1136961&r1=1136960&r2=1136961&view=diff
==============================================================================
--- avro/branches/branch-1.5/lang/java/compiler/src/main/velocity/org/apache/avro/compiler/specific/templates/java/classic/protocol.vm (original)
+++ avro/branches/branch-1.5/lang/java/compiler/src/main/velocity/org/apache/avro/compiler/specific/templates/java/classic/protocol.vm Fri Jun 17 17:55:21 2011
@@ -21,7 +21,7 @@ package $protocol.getNamespace();
@SuppressWarnings("all")
#if ($protocol.getDoc())
- /** $protocol.getDoc() */
+/** $protocol.getDoc() */
#end
public interface $this.mangle($protocol.getName()) {
public static final org.apache.avro.Protocol PROTOCOL = org.apache.avro.Protocol.parse("${this.javaEscape($protocol.toString())}");
@@ -46,4 +46,31 @@ public interface $this.mangle($protocol.
#end## (one way)
;
#end## (requests)
-}
+
+## Generate nested callback API
+ @SuppressWarnings("all")
+#if ($protocol.getDoc())
+ /** $protocol.getDoc() */
+#end
+ public interface Callback extends $this.mangle($protocol.getName()) {
+ public static final org.apache.avro.Protocol PROTOCOL = #if ($protocol.getNamespace())$protocol.getNamespace().#end${this.mangle($protocol.getName())}.PROTOCOL;
+#foreach ($e in $protocol.getMessages().entrySet())
+#set ($name = $e.getKey())
+#set ($message = $e.getValue())
+#set ($response = $message.getResponse())
+## Generate callback method if the message is not one-way:
+#if (! $message.isOneWay())
+#if ($message.getDoc())
+ /** $this.escapeForJavadoc($message.getDoc()) */
+#end
+ void ${this.mangle($name)}(##
+#foreach ($p in $message.getRequest().getFields())##
+#* *#${this.javaUnbox($p.schema())} ${this.mangle($p.name())}#if ($velocityHasNext), #end
+#end
+#if ($message.getRequest().getFields().size() > 0), #end
+org.apache.avro.ipc.Callback<${this.javaType($response)}> callback) throws java.io.IOException;
+#end## (generate callback method)
+#end## (requests)
+ }## End of Callback interface
+
+}## End of protocol interface
Modified: avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyServer.java
URL: http://svn.apache.org/viewvc/avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyServer.java?rev=1136961&r1=1136960&r2=1136961&view=diff
==============================================================================
--- avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyServer.java (original)
+++ avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyServer.java Fri Jun 17 17:55:21 2011
@@ -54,13 +54,13 @@ public class NettyServer implements Serv
private static final Logger LOG = LoggerFactory.getLogger(NettyServer.class
.getName());
- private Responder responder;
+ private final Responder responder;
- private Channel serverChannel;
- private ChannelGroup allChannels = new DefaultChannelGroup(
+ private final Channel serverChannel;
+ private final ChannelGroup allChannels = new DefaultChannelGroup(
"avro-netty-server");
- private ChannelFactory channelFactory;
- private CountDownLatch closed = new CountDownLatch(1);
+ private final ChannelFactory channelFactory;
+ private final CountDownLatch closed = new CountDownLatch(1);
public NettyServer(Responder responder, InetSocketAddress addr) {
this.responder = responder;
@@ -133,14 +133,13 @@ public class NettyServer implements Serv
NettyDataPack dataPack = (NettyDataPack) e.getMessage();
List<ByteBuffer> req = dataPack.getDatas();
List<ByteBuffer> res = responder.respond(req, connectionMetadata);
- dataPack.setDatas(res);
- e.getChannel().write(dataPack);
+ // response will be null for oneway messages.
+ if(res != null) {
+ dataPack.setDatas(res);
+ e.getChannel().write(dataPack);
+ }
} catch (IOException ex) {
LOG.warn("unexpect error");
- } finally {
- if(!connectionMetadata.isConnected()){
- e.getChannel().close();
- }
}
}
Modified: avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java
URL: http://svn.apache.org/viewvc/avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java?rev=1136961&r1=1136960&r2=1136961&view=diff
==============================================================================
--- avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java (original)
+++ avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java Fri Jun 17 17:55:21 2011
@@ -27,10 +27,8 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.avro.Protocol;
@@ -45,6 +43,7 @@ import org.jboss.netty.channel.ChannelFu
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.ChannelState;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.ExceptionEvent;
@@ -61,26 +60,39 @@ public class NettyTransceiver extends Tr
private static final Logger LOG = LoggerFactory.getLogger(NettyTransceiver.class
.getName());
- private ChannelFactory channelFactory;
- private Channel channel;
+ private final AtomicInteger serialGenerator = new AtomicInteger(0);
+ private final Map<Integer, Callback<List<ByteBuffer>>> requests =
+ new ConcurrentHashMap<Integer, Callback<List<ByteBuffer>>>();
- private AtomicInteger serialGenerator = new AtomicInteger(0);
- private Map<Integer, CallFuture> requests =
- new ConcurrentHashMap<Integer, CallFuture>();
+ private final ChannelFactory channelFactory;
+ private final ClientBootstrap bootstrap;
+ private final InetSocketAddress remoteAddr;
- private Protocol remote;
+ /**
+ * Read lock must be acquired whenever using non-final state.
+ * Write lock must be acquired whenever modifying state.
+ */
+ private final ReadWriteLock stateLock = new ReentrantReadWriteLock();
+ private boolean open = false; // Synchronized on stateLock
+ private Channel channel; // Synchronized on stateLock
+ private Protocol remote; // Synchronized on stateLock
+
+ NettyTransceiver() {
+ channelFactory = null;
+ bootstrap = null;
+ remoteAddr = null;
+ }
- NettyTransceiver() {}
-
public NettyTransceiver(InetSocketAddress addr) {
- this(addr, new NioClientSocketChannelFactory(Executors.
- newCachedThreadPool(), Executors.newCachedThreadPool()));
+ this(addr, new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
+ Executors.newCachedThreadPool()));
}
public NettyTransceiver(InetSocketAddress addr, ChannelFactory channelFactory) {
// Set up.
this.channelFactory = channelFactory;
- ClientBootstrap bootstrap = new ClientBootstrap(channelFactory);
+ bootstrap = new ClientBootstrap(channelFactory);
+ remoteAddr = addr;
// Configure the event pipeline factory.
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@@ -97,54 +109,160 @@ public class NettyTransceiver extends Tr
bootstrap.setOption("tcpNoDelay", true);
// Make a new connection.
- ChannelFuture channelFuture = bootstrap.connect(addr);
- channelFuture.awaitUninterruptibly();
- if (!channelFuture.isSuccess()) {
- channelFuture.getCause().printStackTrace();
- throw new RuntimeException(channelFuture.getCause());
+ connect();
+ }
+
+ /**
+ * Connects to the remote peer if not already connected.
+ */
+ private void connect() {
+ stateLock.writeLock().lock();
+ try {
+ if (!open || (channel == null) || !channel.isOpen() || !channel.isBound() || !channel.isConnected()) {
+ LOG.info("Connecting to " + remoteAddr);
+ ChannelFuture channelFuture = bootstrap.connect(remoteAddr);
+ channelFuture.awaitUninterruptibly();
+ if (!channelFuture.isSuccess()) {
+ channelFuture.getCause().printStackTrace();
+ throw new RuntimeException(channelFuture.getCause());
+ }
+ channel = channelFuture.getChannel();
+ open = true;
+ }
+ } finally {
+ stateLock.writeLock().unlock();
+ }
+ }
+
+ /**
+ * Closes the connection to the remote peer if connected.
+ */
+ private void disconnect() {
+ disconnect(false);
+ }
+
+ /**
+ * Closes the connection to the remote peer if connected.
+ * @param awaitCompletion if true, will block until the close has completed.
+ */
+ private void disconnect(boolean awaitCompletion) {
+ stateLock.writeLock().lock();
+ try {
+ if (channel != null) {
+ LOG.info("Disconnecting from " + remoteAddr);
+ ChannelFuture closeFuture = channel.close();
+ if (awaitCompletion) {
+ closeFuture.awaitUninterruptibly();
+ }
+ channel = null;
+ remote = null;
+ open = false;
+ }
+ } finally {
+ stateLock.writeLock().unlock();
}
- channel = channelFuture.getChannel();
+ }
+
+ /**
+ * Netty channels are thread-safe, so there is no need to acquire locks.
+ * This method is a no-op.
+ */
+ @Override
+ public void lockChannel() {
+
+ }
+
+ /**
+ * Netty channels are thread-safe, so there is no need to acquire locks.
+ * This method is a no-op.
+ */
+ @Override
+ public void unlockChannel() {
+
}
public void close() {
- // Close the connection.
- channel.close().awaitUninterruptibly();
- // Shut down all thread pools to exit.
- channelFactory.releaseExternalResources();
+ stateLock.writeLock().lock();
+ try {
+ // Close the connection.
+ disconnect(true);
+ // Shut down all thread pools to exit.
+ channelFactory.releaseExternalResources();
+ } finally {
+ stateLock.writeLock().unlock();
+ }
}
@Override
public String getRemoteName() {
- return channel.getRemoteAddress().toString();
+ stateLock.readLock().lock();
+ try {
+ return channel.getRemoteAddress().toString();
+ } finally {
+ stateLock.readLock().unlock();
+ }
}
/**
* Override as non-synchronized method because the method is thread safe.
*/
@Override
- public List<ByteBuffer> transceive(List<ByteBuffer> request)
- throws IOException {
- int serial = serialGenerator.incrementAndGet();
- NettyDataPack dataPack = new NettyDataPack(serial, request);
- CallFuture callFuture = new CallFuture();
- requests.put(serial, callFuture);
- channel.write(dataPack);
+ public List<ByteBuffer> transceive(List<ByteBuffer> request) {
try {
- return callFuture.get();
+ CallFuture<List<ByteBuffer>> transceiverFuture = new CallFuture<List<ByteBuffer>>();
+ transceive(request, transceiverFuture);
+ return transceiverFuture.get();
} catch (InterruptedException e) {
LOG.warn("failed to get the response", e);
return null;
} catch (ExecutionException e) {
LOG.warn("failed to get the response", e);
return null;
+ }
+ }
+
+ @Override
+ public void transceive(List<ByteBuffer> request, Callback<List<ByteBuffer>> callback) {
+ stateLock.readLock().lock();
+ try {
+ int serial = serialGenerator.incrementAndGet();
+ NettyDataPack dataPack = new NettyDataPack(serial, request);
+ requests.put(serial, callback);
+ writeDataPack(dataPack);
} finally {
- requests.remove(serial);
+ stateLock.readLock().unlock();
}
}
-
+
@Override
public void writeBuffers(List<ByteBuffer> buffers) throws IOException {
- throw new UnsupportedOperationException();
+ writeDataPack(new NettyDataPack(serialGenerator.incrementAndGet(), buffers));
+ }
+
+ /**
+ * Writes a NettyDataPack, reconnecting to the remote peer if necessary.
+ * @param dataPack the data pack to write.
+ */
+ private void writeDataPack(NettyDataPack dataPack) {
+ stateLock.readLock().lock();
+ try {
+ while ((channel == null) || !channel.isOpen() || !channel.isBound() || !channel.isConnected()) {
+ // Need to reconnect
+ // Upgrade to write lock
+ stateLock.readLock().unlock();
+ stateLock.writeLock().lock();
+ try {
+ connect();
+ } finally {
+ // Downgrade to read lock:
+ stateLock.readLock().lock();
+ stateLock.writeLock().unlock();
+ }
+ }
+ channel.write(dataPack);
+ } finally {
+ stateLock.readLock().unlock();
+ }
}
@Override
@@ -154,71 +272,32 @@ public class NettyTransceiver extends Tr
@Override
public Protocol getRemote() {
- return remote;
+ stateLock.readLock().lock();
+ try {
+ return remote;
+ } finally {
+ stateLock.readLock().unlock();
+ }
}
@Override
public boolean isConnected() {
- return remote!=null;
+ stateLock.readLock().lock();
+ try {
+ return remote!=null;
+ } finally {
+ stateLock.readLock().unlock();
+ }
}
@Override
public void setRemote(Protocol protocol) {
- this.remote = protocol;
- }
-
- /**
- * Future class for a RPC call
- */
- class CallFuture implements Future<List<ByteBuffer>>{
- private Semaphore sem = new Semaphore(0);
- private List<ByteBuffer> response = null;
-
- public void setResponse(List<ByteBuffer> response) {
- this.response = response;
- sem.release();
- }
-
- public void releaseSemphore() {
- sem.release();
- }
-
- public List<ByteBuffer> getResponse() {
- return response;
- }
-
- @Override
- public boolean cancel(boolean mayInterruptIfRunning) {
- return false;
- }
-
- @Override
- public boolean isCancelled() {
- return false;
- }
-
- @Override
- public List<ByteBuffer> get() throws InterruptedException,
- ExecutionException {
- sem.acquire();
- return response;
- }
-
- @Override
- public List<ByteBuffer> get(long timeout, TimeUnit unit)
- throws InterruptedException, ExecutionException, TimeoutException {
- if (sem.tryAcquire(timeout, unit)) {
- return response;
- } else {
- throw new TimeoutException();
- }
- }
-
- @Override
- public boolean isDone() {
- return sem.availablePermits()>0;
+ stateLock.writeLock().lock();
+ try {
+ this.remote = protocol;
+ } finally {
+ stateLock.writeLock().unlock();
}
-
}
/**
@@ -231,6 +310,33 @@ public class NettyTransceiver extends Tr
throws Exception {
if (e instanceof ChannelStateEvent) {
LOG.info(e.toString());
+ ChannelStateEvent cse = (ChannelStateEvent)e;
+ if ((cse.getState() == ChannelState.OPEN) && (Boolean.FALSE.equals(cse.getValue()))) {
+ // Server closed connection; disconnect client side
+ LOG.info("Remote peer " + remoteAddr + " closed connection.");
+ stateLock.readLock().lock();
+ boolean readLockAcquired = true;
+ try {
+ // Only disconnect if open to prevent deadlock on close()
+ if (open) {
+ // Upgrade to write lock:
+ stateLock.readLock().unlock();
+ readLockAcquired = false;
+ stateLock.writeLock().lock();
+ try {
+ if (open) {
+ disconnect();
+ }
+ } finally {
+ stateLock.writeLock().unlock();
+ }
+ }
+ } finally {
+ if (readLockAcquired) {
+ stateLock.readLock().unlock();
+ }
+ }
+ }
}
super.handleUpstream(ctx, e);
}
@@ -245,11 +351,15 @@ public class NettyTransceiver extends Tr
@Override
public void messageReceived(ChannelHandlerContext ctx, final MessageEvent e) {
NettyDataPack dataPack = (NettyDataPack)e.getMessage();
- CallFuture callFuture = requests.get(dataPack.getSerial());
- if (callFuture==null) {
+ Callback<List<ByteBuffer>> callback = requests.get(dataPack.getSerial());
+ if (callback==null) {
throw new RuntimeException("Missing previous call info");
}
- callFuture.setResponse(dataPack.getDatas());
+ try {
+ callback.handleResult(dataPack.getDatas());
+ } finally {
+ requests.remove(dataPack.getSerial());
+ }
}
@Override
@@ -257,9 +367,9 @@ public class NettyTransceiver extends Tr
LOG.warn("Unexpected exception from downstream.", e.getCause());
e.getChannel().close();
// let the blocking waiting exit
- Iterator<CallFuture> it = requests.values().iterator();
+ Iterator<Callback<List<ByteBuffer>>> it = requests.values().iterator();
while(it.hasNext()) {
- it.next().releaseSemphore();
+ it.next().handleError(e.getCause());
it.remove();
}
Modified: avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/Requestor.java
URL: http://svn.apache.org/viewvc/avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/Requestor.java?rev=1136961&r1=1136960&r2=1136961&view=diff
==============================================================================
--- avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/Requestor.java (original)
+++ avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/Requestor.java Fri Jun 17 17:55:21 2011
@@ -20,9 +20,11 @@ package org.apache.avro.ipc;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.List;
import java.util.Map;
@@ -57,12 +59,13 @@ public abstract class Requestor {
private static final GenericDatumWriter<Map<CharSequence,ByteBuffer>>
META_WRITER = new GenericDatumWriter<Map<CharSequence,ByteBuffer>>(META);
- private Protocol local;
- private Protocol remote;
- private boolean sendLocalText;
- private Transceiver transceiver;
+ private final Protocol local;
+ private volatile Protocol remote;
+ private volatile boolean sendLocalText;
+ private final Transceiver transceiver;
+ private final ReentrantLock handshakeLock = new ReentrantLock();
- protected List<RPCPlugin> rpcMetaPlugins;
+ protected final List<RPCPlugin> rpcMetaPlugins;
public Protocol getLocal() { return local; }
public Transceiver getTransceiver() { return transceiver; }
@@ -72,7 +75,7 @@ public abstract class Requestor {
this.local = local;
this.transceiver = transceiver;
this.rpcMetaPlugins =
- Collections.synchronizedList(new ArrayList<RPCPlugin>());
+ new CopyOnWriteArrayList<RPCPlugin>();
}
/**
@@ -85,93 +88,96 @@ public abstract class Requestor {
}
private static final EncoderFactory ENCODER_FACTORY = new EncoderFactory();
- private BinaryEncoder encoder =
- ENCODER_FACTORY.binaryEncoder(new ByteBufferOutputStream(), null);
/** Writes a request message and reads a response or error message. */
- public synchronized Object request(String messageName, Object request)
+ public Object request(String messageName, Object request)
throws Exception {
- Transceiver t = getTransceiver();
- BinaryDecoder in = null;
- Message m;
- RPCContext context = new RPCContext();
- do {
- ByteBufferOutputStream bbo = new ByteBufferOutputStream();
- //safe to use encoder because this is synchronized
- BinaryEncoder out = ENCODER_FACTORY.binaryEncoder(bbo, encoder);
-
- // use local protocol to write request
- m = getLocal().getMessages().get(messageName);
- if (m == null)
- throw new AvroRuntimeException("Not a local message: "+messageName);
- context.setMessage(m);
+ // Initialize request
+ Request rpcRequest = new Request(messageName, request, new RPCContext());
+ CallFuture<Object> future = /* only need a Future for two-way messages */
+ rpcRequest.getMessage().isOneWay() ? null : new CallFuture<Object>();
- writeRequest(m.getRequest(), request, out); // write request payload
-
- out.flush();
- List<ByteBuffer> payload = bbo.getBufferList();
-
- writeHandshake(out); // prepend handshake if needed
-
- context.setRequestPayload(payload);
- for (RPCPlugin plugin : rpcMetaPlugins) {
- plugin.clientSendRequest(context); // get meta-data from plugins
+ // Send request
+ request(rpcRequest, future);
+
+ if (future == null) // the message is one-way, so return immediately
+ return null;
+ try { // the message is two-way, wait for the result
+ return future.get();
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof Exception) {
+ throw (Exception)e.getCause();
+ } else {
+ throw new RuntimeException(e.getCause());
}
- META_WRITER.write(context.requestCallMeta(), out);
-
- out.writeString(m.getName()); // write message name
-
- out.flush();
- bbo.append(payload);
-
- List<ByteBuffer> requestBytes = bbo.getBufferList();
-
- if (m.isOneWay() && t.isConnected()) { // send one-way message
- t.writeBuffers(requestBytes);
-
- return null;
- } else { // two-way message
- List<ByteBuffer> response = t.transceive(requestBytes);
- ByteBufferInputStream bbi = new ByteBufferInputStream(response);
- in = DecoderFactory.get().binaryDecoder(bbi, in);
+ }
+ }
+
+ /**
+ * Writes a request message and returns the result through a Callback.
+ * Clients can also use a Future interface by creating a new CallFuture<T>,
+ * passing it in as the Callback parameter, and then waiting on that Future.
+ * @param <T> the return type of the message.
+ * @param messageName the name of the message to invoke.
+ * @param request the request data to send.
+ * @param callback the callback which will be invoked when the response is returned
+ * or an error occurs.
+ * @throws Exception if an error occurs sending the message.
+ */
+ public <T> void request(String messageName, Object request, Callback<T> callback)
+ throws Exception {
+ request(new Request(messageName, request, new RPCContext()), callback);
+ }
+
+ /** Writes a request message and returns the result through a Callback. */
+ <T> void request(Request request, Callback<T> callback)
+ throws Exception {
+ Transceiver t = getTransceiver();
+ if (!t.isConnected()) {
+ // Acquire handshake lock so that only one thread is performing the
+ // handshake and other threads block until the handshake is completed
+ handshakeLock.lock();
+ try {
+ if (t.isConnected()) {
+ // Another thread already completed the handshake; no need to hold
+ // the write lock
+ handshakeLock.unlock();
+ } else {
+ CallFuture<T> callFuture = new CallFuture<T>(callback);
+ t.transceive(request.getBytes(),
+ new TransceiverCallback<T>(request, callFuture));
+ // Block until handshake complete
+ callFuture.await();
+ return;
+ }
+ } finally{
+ if (handshakeLock.isHeldByCurrentThread()) {
+ handshakeLock.unlock();
+ }
}
- } while (!readHandshake(in));
-
- // use remote protocol to read response
- Message rm = remote.getMessages().get(messageName);
- if (rm == null)
- throw new AvroRuntimeException("Not a remote message: "+messageName);
-
- if ((m.isOneWay() != rm.isOneWay()) && t.isConnected())
- throw new AvroRuntimeException("Not both one-way messages: "+messageName);
-
- if (m.isOneWay() && t.isConnected()) return null; // one-way w/ handshake
-
- context.setResponseCallMeta(META_READER.read(null, in));
-
- if (!in.readBoolean()) { // no error
- Object response = readResponse(rm.getResponse(), m.getResponse(), in);
- context.setResponse(response);
- for (RPCPlugin plugin : rpcMetaPlugins) {
- plugin.clientReceiveResponse(context);
+ }
+
+ if (request.getMessage().isOneWay()) {
+ t.lockChannel();
+ try {
+ t.writeBuffers(request.getBytes());
+ if (callback != null) {
+ callback.handleResult(null);
+ }
+ } finally {
+ t.unlockChannel();
}
- return response;
-
} else {
- Exception error = readError(rm.getErrors(), m.getErrors(), in);
- context.setError(error);
- for (RPCPlugin plugin : rpcMetaPlugins) {
- plugin.clientReceiveResponse(context);
- }
- throw error;
+ t.transceive(request.getBytes(),
+ new TransceiverCallback<T>(request, callback));
}
}
- private static final Map<String,MD5> REMOTE_HASHES =
- Collections.synchronizedMap(new HashMap<String,MD5>());
- private static final Map<MD5,Protocol> REMOTE_PROTOCOLS =
- Collections.synchronizedMap(new HashMap<MD5,Protocol>());
+ private static final ConcurrentMap<String,MD5> REMOTE_HASHES =
+ new ConcurrentHashMap<String,MD5>();
+ private static final ConcurrentMap<MD5,Protocol> REMOTE_PROTOCOLS =
+ new ConcurrentHashMap<MD5,Protocol>();
private static final SpecificDatumWriter<HandshakeRequest> HANDSHAKE_WRITER =
new SpecificDatumWriter<HandshakeRequest>(HandshakeRequest.class);
@@ -185,10 +191,11 @@ public abstract class Requestor {
localHash.bytes(local.getMD5());
String remoteName = transceiver.getRemoteName();
MD5 remoteHash = REMOTE_HASHES.get(remoteName);
- remote = REMOTE_PROTOCOLS.get(remoteHash);
if (remoteHash == null) { // guess remote is local
remoteHash = localHash;
remote = local;
+ } else {
+ remote = REMOTE_PROTOCOLS.get(remoteHash);
}
HandshakeRequest handshake = new HandshakeRequest();
handshake.clientHash = localHash;
@@ -244,30 +251,36 @@ public abstract class Requestor {
remote = Protocol.parse(handshake.serverProtocol.toString());
MD5 remoteHash = (MD5)handshake.serverHash;
REMOTE_HASHES.put(transceiver.getRemoteName(), remoteHash);
- if (!REMOTE_PROTOCOLS.containsKey(remoteHash))
- REMOTE_PROTOCOLS.put(remoteHash, remote);
+ REMOTE_PROTOCOLS.putIfAbsent(remoteHash, remote);
}
/** Return the remote protocol. Force a handshake if required. */
- public synchronized Protocol getRemote() throws IOException {
+ public Protocol getRemote() throws IOException {
if (remote != null) return remote; // already have it
MD5 remoteHash = REMOTE_HASHES.get(transceiver.getRemoteName());
- remote = REMOTE_PROTOCOLS.get(remoteHash);
- if (remote != null) return remote; // already cached
- // force handshake
- ByteBufferOutputStream bbo = new ByteBufferOutputStream();
- // direct because the payload is tiny.
- Encoder out = ENCODER_FACTORY.directBinaryEncoder(bbo, null);
- writeHandshake(out);
- out.writeInt(0); // empty metadata
- out.writeString(""); // bogus message name
- List<ByteBuffer> response =
- getTransceiver().transceive(bbo.getBufferList());
- ByteBufferInputStream bbi = new ByteBufferInputStream(response);
- BinaryDecoder in =
- DecoderFactory.get().binaryDecoder(bbi, null);
- readHandshake(in);
- return this.remote;
+ if (remoteHash != null) {
+ remote = REMOTE_PROTOCOLS.get(remoteHash);
+ if (remote != null) return remote; // already cached
+ }
+ handshakeLock.lock();
+ try {
+ // force handshake
+ ByteBufferOutputStream bbo = new ByteBufferOutputStream();
+ // direct because the payload is tiny.
+ Encoder out = ENCODER_FACTORY.directBinaryEncoder(bbo, null);
+ writeHandshake(out);
+ out.writeInt(0); // empty metadata
+ out.writeString(""); // bogus message name
+ List<ByteBuffer> response =
+ getTransceiver().transceive(bbo.getBufferList());
+ ByteBufferInputStream bbi = new ByteBufferInputStream(response);
+ BinaryDecoder in =
+ DecoderFactory.get().binaryDecoder(bbi, null);
+ readHandshake(in);
+ return this.remote;
+ } finally {
+ handshakeLock.unlock();
+ }
}
@@ -292,5 +305,249 @@ public abstract class Requestor {
/** Reads an error message. */
public abstract Exception readError(Schema writer, Schema reader, Decoder in)
throws IOException;
-}
+
+ /**
+ * Handles callbacks from transceiver invocations.
+ */
+ protected class TransceiverCallback<T> implements Callback<List<ByteBuffer>> {
+ private final Request request;
+ private final Callback<T> callback;
+
+ /**
+ * Creates a TransceiverCallback.
+ * @param request the request to set.
+ * @param callback the callback to set.
+ */
+ public TransceiverCallback(Request request, Callback<T> callback) {
+ this.request = request;
+ this.callback = callback;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void handleResult(List<ByteBuffer> responseBytes) {
+ ByteBufferInputStream bbi = new ByteBufferInputStream(responseBytes);
+ BinaryDecoder in = DecoderFactory.get().binaryDecoder(bbi, null);
+ try {
+ if (!readHandshake(in)) {
+ // Resend the handshake and return
+ Request handshake = new Request(request);
+ getTransceiver().transceive
+ (handshake.getBytes(),
+ new TransceiverCallback<T>(handshake, callback));
+ return;
+ }
+ } catch (Exception e) {
+ LOG.error("Error handling transceiver callback: " + e, e);
+ }
+
+ // Read response; invoke callback
+ Response response = new Response(request, in);
+ Object responseObject;
+ try {
+ try {
+ responseObject = response.getResponse();
+ } catch (Exception e) {
+ if (callback != null) {
+ callback.handleError(e);
+ }
+ return;
+ }
+ if (callback != null) {
+ callback.handleResult((T)responseObject);
+ }
+ } catch (Throwable t) {
+ LOG.error("Error in callback handler: " + t, t);
+ }
+ }
+
+ @Override
+ public void handleError(Throwable error) {
+ callback.handleError(error);
+ }
+ }
+
+ /**
+ * Encapsulates/generates a request.
+ */
+ class Request {
+ private final String messageName;
+ private final Object request;
+ private final RPCContext context;
+ private final BinaryEncoder encoder;
+ private Message message;
+ private List<ByteBuffer> requestBytes;
+
+ /**
+ * Creates a Request.
+ * @param messageName the name of the message to invoke.
+ * @param request the request data to send.
+ * @param context the RPC context to use.
+ */
+ public Request(String messageName, Object request, RPCContext context) {
+ this(messageName, request, context, null);
+ }
+
+ /**
+ * Creates a Request.
+ * @param messageName the name of the message to invoke.
+ * @param request the request data to send.
+ * @param context the RPC context to use.
+ * @param encoder the BinaryEncoder to use to serialize the request.
+ */
+ public Request(String messageName, Object request, RPCContext context,
+ BinaryEncoder encoder) {
+ this.messageName = messageName;
+ this.request = request;
+ this.context = context;
+ this.encoder =
+ ENCODER_FACTORY.binaryEncoder(new ByteBufferOutputStream(), encoder);
+ }
+
+ /**
+ * Copy constructor.
+ * @param other Request from which to copy fields.
+ */
+ public Request(Request other) {
+ this.messageName = other.messageName;
+ this.request = other.request;
+ this.context = other.context;
+ this.encoder = other.encoder;
+ }
+
+ /**
+ * Gets the message name.
+ * @return the message name.
+ */
+ public String getMessageName() {
+ return messageName;
+ }
+
+ /**
+ * Gets the RPC context.
+ * @return the RPC context.
+ */
+ public RPCContext getContext() {
+ return context;
+ }
+
+ /**
+ * Gets the Message associated with this request.
+ * @return this request's message.
+ */
+ public Message getMessage() {
+ if (message == null) {
+ message = getLocal().getMessages().get(messageName);
+ if (message == null) {
+ throw new AvroRuntimeException("Not a local message: "+messageName);
+ }
+ }
+ return message;
+ }
+
+ /**
+ * Gets the request data, generating it first if necessary.
+ * @return the request data.
+ * @throws Exception if an error occurs generating the request data.
+ */
+ public List<ByteBuffer> getBytes()
+ throws Exception {
+ if (requestBytes == null) {
+ ByteBufferOutputStream bbo = new ByteBufferOutputStream();
+ BinaryEncoder out = ENCODER_FACTORY.binaryEncoder(bbo, encoder);
+
+ // use local protocol to write request
+ Message m = getMessage();
+ context.setMessage(m);
+
+ writeRequest(m.getRequest(), request, out); // write request payload
+
+ out.flush();
+ List<ByteBuffer> payload = bbo.getBufferList();
+
+ writeHandshake(out); // prepend handshake if needed
+
+ context.setRequestPayload(payload);
+ for (RPCPlugin plugin : rpcMetaPlugins) {
+ plugin.clientSendRequest(context); // get meta-data from plugins
+ }
+ META_WRITER.write(context.requestCallMeta(), out);
+ out.writeString(m.getName()); // write message name
+
+ out.flush();
+ bbo.append(payload);
+
+ requestBytes = bbo.getBufferList();
+ }
+ return requestBytes;
+ }
+ }
+
+ /**
+ * Encapsulates/parses a response.
+ */
+ class Response {
+ private final Request request;
+ private final BinaryDecoder in;
+
+ /**
+ * Creates a Response.
+ * @param request the Request associated with this response.
+ */
+ public Response(Request request) {
+ this(request, null);
+ }
+
+ /**
+ * Creates a Creates a Response.
+ * @param request the Request associated with this response.
+ * @param in the BinaryDecoder to use to deserialize the response.
+ */
+ public Response(Request request, BinaryDecoder in) {
+ this.request = request;
+ this.in = in;
+ }
+
+ /**
+ * Gets the RPC response, reading/deserializing it first if necessary.
+ * @return the RPC response.
+ * @throws Exception if an error occurs reading/deserializing the response.
+ */
+ public Object getResponse()
+ throws Exception {
+ Message lm = request.getMessage();
+ Message rm = remote.getMessages().get(request.getMessageName());
+ if (rm == null)
+ throw new AvroRuntimeException
+ ("Not a remote message: "+request.getMessageName());
+
+ Transceiver t = getTransceiver();
+ if ((lm.isOneWay() != rm.isOneWay()) && t.isConnected())
+ throw new AvroRuntimeException
+ ("Not both one-way messages: "+request.getMessageName());
+
+ if (lm.isOneWay() && t.isConnected()) return null; // one-way w/ handshake
+
+ RPCContext context = request.getContext();
+ context.setResponseCallMeta(META_READER.read(null, in));
+
+ if (!in.readBoolean()) { // no error
+ Object response = readResponse(rm.getResponse(), lm.getResponse(), in);
+ context.setResponse(response);
+ for (RPCPlugin plugin : rpcMetaPlugins) {
+ plugin.clientReceiveResponse(context);
+ }
+ return response;
+
+ } else {
+ Exception error = readError(rm.getErrors(), lm.getErrors(), in);
+ context.setError(error);
+ for (RPCPlugin plugin : rpcMetaPlugins) {
+ plugin.clientReceiveResponse(context);
+ }
+ throw error;
+ }
+ }
+ }
+}
Modified: avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/Responder.java
URL: http://svn.apache.org/viewvc/avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/Responder.java?rev=1136961&r1=1136960&r2=1136961&view=diff
==============================================================================
--- avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/Responder.java (original)
+++ avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/Responder.java Fri Jun 17 17:55:21 2011
@@ -21,9 +21,8 @@ package org.apache.avro.ipc;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.List;
import java.util.Map;
@@ -62,12 +61,12 @@ public abstract class Responder {
private static final ThreadLocal<Protocol> REMOTE =
new ThreadLocal<Protocol>();
- private Map<MD5,Protocol> protocols
- = Collections.synchronizedMap(new HashMap<MD5,Protocol>());
+ private final Map<MD5,Protocol> protocols
+ = new ConcurrentHashMap<MD5,Protocol>();
- private Protocol local;
- private MD5 localHash;
- protected List<RPCPlugin> rpcMetaPlugins;
+ private final Protocol local;
+ private final MD5 localHash;
+ protected final List<RPCPlugin> rpcMetaPlugins;
protected Responder(Protocol local) {
this.local = local;
@@ -75,7 +74,7 @@ public abstract class Responder {
localHash.bytes(local.getMD5());
protocols.put(localHash, local);
this.rpcMetaPlugins =
- Collections.synchronizedList(new ArrayList<RPCPlugin>());
+ new CopyOnWriteArrayList<RPCPlugin>();
}
/** Return the remote protocol. Accesses a {@link ThreadLocal} that's set
Modified: avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/Transceiver.java
URL: http://svn.apache.org/viewvc/avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/Transceiver.java?rev=1136961&r1=1136960&r2=1136961&view=diff
==============================================================================
--- avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/Transceiver.java (original)
+++ avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/Transceiver.java Fri Jun 17 17:55:21 2011
@@ -22,21 +22,58 @@ import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.avro.Protocol;
/** Base transport class used by {@link Requestor}. */
public abstract class Transceiver implements Closeable {
+ private final ReentrantLock channelLock = new ReentrantLock();
public abstract String getRemoteName();
+
+ /**
+ * Acquires an exclusive lock on the transceiver's channel.
+ */
+ public void lockChannel() {
+ channelLock.lock();
+ }
+
+ /**
+ * Releases the lock on the transceiver's channel if held by the calling thread.
+ */
+ public void unlockChannel() {
+ if (channelLock.isHeldByCurrentThread()) {
+ channelLock.unlock();
+ }
+ }
/** Called by {@link Requestor#request(String,Object)} for two-way messages.
* By default calls {@link #writeBuffers(List)} followed by
* {@link #readBuffers()}. */
- public synchronized List<ByteBuffer> transceive(List<ByteBuffer> request)
+ public List<ByteBuffer> transceive(List<ByteBuffer> request)
+ throws IOException {
+ lockChannel();
+ try {
+ writeBuffers(request);
+ return readBuffers();
+ } finally {
+ unlockChannel();
+ }
+ }
+
+ /**
+ * Called by {@link Requestor#request(String,Object,Callback)} for two-way messages using callbacks.
+ */
+ public void transceive(List<ByteBuffer> request, Callback<List<ByteBuffer>> callback)
throws IOException {
- writeBuffers(request);
- return readBuffers();
+ // The default implementation works synchronously
+ try {
+ List<ByteBuffer> response = transceive(request);
+ callback.handleResult(response);
+ } catch (IOException e) {
+ callback.handleError(e);
+ }
}
/** Called by the default definition of {@link #transceive(List)}.*/
Modified: avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/generic/GenericRequestor.java
URL: http://svn.apache.org/viewvc/avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/generic/GenericRequestor.java?rev=1136961&r1=1136960&r2=1136961&view=diff
==============================================================================
--- avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/generic/GenericRequestor.java (original)
+++ avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/generic/GenericRequestor.java Fri Jun 17 17:55:21 2011
@@ -28,6 +28,7 @@ import org.apache.avro.generic.GenericDa
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.Encoder;
+import org.apache.avro.ipc.Callback;
import org.apache.avro.ipc.Requestor;
import org.apache.avro.ipc.Transceiver;
@@ -53,6 +54,20 @@ public class GenericRequestor extends Re
}
@Override
+ public <T> void request(String messageName, Object request, Callback<T> callback)
+ throws IOException {
+ try {
+ super.request(messageName, request, callback);
+ } catch (Exception e) {
+ if (e instanceof RuntimeException)
+ throw (RuntimeException)e;
+ if (e instanceof IOException)
+ throw (IOException)e;
+ throw new AvroRemoteException(e);
+ }
+ }
+
+ @Override
public void writeRequest(Schema schema, Object request, Encoder out)
throws IOException {
new GenericDatumWriter<Object>(schema).write(request, out);
Modified: avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/specific/SpecificRequestor.java
URL: http://svn.apache.org/viewvc/avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/specific/SpecificRequestor.java?rev=1136961&r1=1136960&r2=1136961&view=diff
==============================================================================
--- avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/specific/SpecificRequestor.java (original)
+++ avro/branches/branch-1.5/lang/java/ipc/src/main/java/org/apache/avro/ipc/specific/SpecificRequestor.java Fri Jun 17 17:55:21 2011
@@ -22,6 +22,8 @@ import java.io.IOException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Type;
+import java.util.Arrays;
import org.apache.avro.Protocol;
import org.apache.avro.Schema;
@@ -32,6 +34,7 @@ import org.apache.avro.io.Decoder;
import org.apache.avro.io.Encoder;
import org.apache.avro.ipc.Transceiver;
import org.apache.avro.ipc.Requestor;
+import org.apache.avro.ipc.Callback;
import org.apache.avro.specific.SpecificData;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
@@ -52,7 +55,20 @@ public class SpecificRequestor extends R
@Override
public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable {
- return request(method.getName(), args);
+ // Check if this is a callback-based RPC:
+ Type[] parameterTypes = method.getParameterTypes();
+ if ((parameterTypes.length > 0) &&
+ (parameterTypes[parameterTypes.length - 1] instanceof Class) &&
+ Callback.class.isAssignableFrom(((Class<?>)parameterTypes[parameterTypes.length - 1]))) {
+ // Extract the Callback from the end of of the argument list
+ Object[] finalArgs = Arrays.copyOf(args, args.length - 1);
+ Callback<?> callback = (Callback<?>)args[args.length - 1];
+ request(method.getName(), finalArgs, callback);
+ return null;
+ }
+ else {
+ return request(method.getName(), args);
+ }
}
protected DatumWriter<Object> getDatumWriter(Schema schema) {
Modified: avro/branches/branch-1.5/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServer.java
URL: http://svn.apache.org/viewvc/avro/branches/branch-1.5/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServer.java?rev=1136961&r1=1136960&r2=1136961&view=diff
==============================================================================
--- avro/branches/branch-1.5/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServer.java (original)
+++ avro/branches/branch-1.5/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServer.java Fri Jun 17 17:55:21 2011
@@ -18,7 +18,11 @@
package org.apache.avro.ipc;
+import static org.junit.Assert.assertEquals;
+
import java.net.InetSocketAddress;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import junit.framework.Assert;
@@ -27,52 +31,109 @@ import org.apache.avro.ipc.specific.Spec
import org.apache.avro.test.Mail;
import org.apache.avro.test.Message;
import org.apache.avro.util.Utf8;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
import org.junit.Test;
public class TestNettyServer {
+ private static Server server;
+ private static Transceiver transceiver;
+ private static Mail proxy;
+ private static MailImpl mailService;
+
public static class MailImpl implements Mail {
+
+ private CountDownLatch allMessages = new CountDownLatch(5);
+
// in this simple example just return details of the message
public CharSequence send(Message message) {
return new Utf8("Sent message to [" + message.to.toString() + "] from ["
+ message.from.toString() + "] with body [" + message.body.toString()
+ "]");
}
- }
+
+ public void fireandforget(Message message) {
+ allMessages.countDown();
+ }
+
+ private void awaitMessages() throws InterruptedException {
+ allMessages.await(2, TimeUnit.SECONDS);
+ }
+
+ private void assertAllMessagesReceived() {
+ assertEquals(0, allMessages.getCount());
+ }
- @Test
- public void test() throws Exception {
+ public void reset() {
+ allMessages = new CountDownLatch(5);
+ }
+ }
+
+ @BeforeClass
+ public static void initializeConnections()throws Exception {
// start server
System.out.println("starting server...");
- Responder responder = new SpecificResponder(Mail.class, new MailImpl());
- Server server = new NettyServer(responder, new InetSocketAddress(0));
+ mailService = new MailImpl();
+ Responder responder = new SpecificResponder(Mail.class, mailService);
+ server = new NettyServer(responder, new InetSocketAddress(0));
server.start();
-
+
int serverPort = server.getPort();
System.out.println("server port : " + serverPort);
- // client
- Transceiver transceiver = new NettyTransceiver(new InetSocketAddress(
+ transceiver = new NettyTransceiver(new InetSocketAddress(
serverPort));
- Mail proxy = SpecificRequestor.getClient(Mail.class, transceiver);
+ proxy = SpecificRequestor.getClient(Mail.class, transceiver);
+ }
+
+ @AfterClass
+ public static void tearDownConnections() throws Exception{
+ transceiver.close();
+ server.close();
+ }
+ @Test
+ public void testRequestResponse() throws Exception {
+ for(int x = 0; x < 5; x++) {
+ verifyResponse(proxy.send(createMessage()));
+ }
+ }
+
+ private void verifyResponse(CharSequence result) {
+ Assert.assertEquals(
+ "Sent message to [wife] from [husband] with body [I love you!]",
+ result.toString());
+ }
+
+ @Test
+ public void testOneway() throws Exception {
+ for (int x = 0; x < 5; x++) {
+ proxy.fireandforget(createMessage());
+ }
+ mailService.awaitMessages();
+ mailService.assertAllMessagesReceived();
+ }
+
+ @Test
+ public void testMixtureOfRequests() throws Exception {
+ mailService.reset();
+ for (int x = 0; x < 5; x++) {
+ Message createMessage = createMessage();
+ proxy.fireandforget(createMessage);
+ verifyResponse(proxy.send(createMessage));
+ }
+ mailService.awaitMessages();
+ mailService.assertAllMessagesReceived();
+
+ }
+
+ private Message createMessage() {
Message msg = new Message();
msg.to = new Utf8("wife");
msg.from = new Utf8("husband");
msg.body = new Utf8("I love you!");
-
- try {
- for(int x = 0; x < 5; x++) {
- CharSequence result = proxy.send(msg);
- System.out.println("Result: " + result);
- Assert.assertEquals(
- "Sent message to [wife] from [husband] with body [I love you!]",
- result.toString());
- }
- } finally {
- transceiver.close();
- server.close();
- }
+ return msg;
}
}
Modified: avro/branches/branch-1.5/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestRpcPluginOrdering.java
URL: http://svn.apache.org/viewvc/avro/branches/branch-1.5/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestRpcPluginOrdering.java?rev=1136961&r1=1136960&r2=1136961&view=diff
==============================================================================
--- avro/branches/branch-1.5/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestRpcPluginOrdering.java (original)
+++ avro/branches/branch-1.5/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestRpcPluginOrdering.java Fri Jun 17 17:55:21 2011
@@ -94,5 +94,7 @@ public class TestRpcPluginOrdering {
public CharSequence send(Message message) throws AvroRemoteException {
return new Utf8("Received");
}
+ public void fireandforget(Message message) {
+ }
}
}
Modified: avro/branches/branch-1.5/share/test/schemas/mail.avpr
URL: http://svn.apache.org/viewvc/avro/branches/branch-1.5/share/test/schemas/mail.avpr?rev=1136961&r1=1136960&r2=1136961&view=diff
==============================================================================
--- avro/branches/branch-1.5/share/test/schemas/mail.avpr (original)
+++ avro/branches/branch-1.5/share/test/schemas/mail.avpr Fri Jun 17 17:55:21 2011
@@ -15,6 +15,12 @@
"send": {
"request": [{"name": "message", "type": "Message"}],
"response": "string"
+ },
+ "fireandforget": {
+ "request": [{"name": "message", "type": "Message"}],
+ "response": "null",
+ "one-way": true
}
+
}
-}
\ No newline at end of file
+}