You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by cu...@apache.org on 2011/06/16 11:33:29 UTC
svn commit: r1136342 - in /avro/trunk: ./
lang/java/compiler/src/main/velocity/org/apache/avro/compiler/specific/templates/java/classic/
lang/java/ipc/src/main/java/org/apache/avro/ipc/
lang/java/ipc/src/main/java/org/apache/avro/ipc/generic/ lang/java...
Author: cutting
Date: Thu Jun 16 09:33:29 2011
New Revision: 1136342
URL: http://svn.apache.org/viewvc?rev=1136342&view=rev
Log:
AVRO-539. Java: Add asynchronous RPC support, through either callbacks or futures. Contributed by James Baldassari.
Added:
avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/CallFuture.java
avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/Callback.java
avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/TestProtocolNetty.java
avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithCallbacks.java
Modified:
avro/trunk/CHANGES.txt
avro/trunk/lang/java/compiler/src/main/velocity/org/apache/avro/compiler/specific/templates/java/classic/protocol.vm
avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyServer.java
avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java
avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/Requestor.java
avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/Responder.java
avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/Transceiver.java
avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/generic/GenericRequestor.java
avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/specific/SpecificRequestor.java
Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1136342&r1=1136341&r2=1136342&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Thu Jun 16 09:33:29 2011
@@ -6,6 +6,9 @@ Avro 1.6.0 (unreleased)
NEW FEATURES
+ AVRO-539. Java: Add asynchronous RPC support, through either
+ callbacks or futures. (James Baldassari via cutting)
+
IMPROVEMENTS
AVRO-833. Don't require simplejson for python >= 2.6.
Modified: avro/trunk/lang/java/compiler/src/main/velocity/org/apache/avro/compiler/specific/templates/java/classic/protocol.vm
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/compiler/src/main/velocity/org/apache/avro/compiler/specific/templates/java/classic/protocol.vm?rev=1136342&r1=1136341&r2=1136342&view=diff
==============================================================================
--- avro/trunk/lang/java/compiler/src/main/velocity/org/apache/avro/compiler/specific/templates/java/classic/protocol.vm (original)
+++ avro/trunk/lang/java/compiler/src/main/velocity/org/apache/avro/compiler/specific/templates/java/classic/protocol.vm Thu Jun 16 09:33:29 2011
@@ -21,7 +21,7 @@ package $protocol.getNamespace();
@SuppressWarnings("all")
#if ($protocol.getDoc())
- /** $protocol.getDoc() */
+/** $protocol.getDoc() */
#end
public interface $this.mangle($protocol.getName()) {
public static final org.apache.avro.Protocol PROTOCOL = org.apache.avro.Protocol.parse("${this.javaEscape($protocol.toString())}");
@@ -46,4 +46,31 @@ public interface $this.mangle($protocol.
#end## (one way)
;
#end## (requests)
-}
+
+## Generate nested callback API
+ @SuppressWarnings("all")
+#if ($protocol.getDoc())
+ /** $protocol.getDoc() */
+#end
+ public interface Callback extends $this.mangle($protocol.getName()) {
+ public static final org.apache.avro.Protocol PROTOCOL = #if ($protocol.getNamespace())$protocol.getNamespace().#end${this.mangle($protocol.getName())}.PROTOCOL;
+#foreach ($e in $protocol.getMessages().entrySet())
+#set ($name = $e.getKey())
+#set ($message = $e.getValue())
+#set ($response = $message.getResponse())
+## Generate callback method if the message is not one-way:
+#if (! $message.isOneWay())
+#if ($message.getDoc())
+ /** $this.escapeForJavadoc($message.getDoc()) */
+#end
+ void ${this.mangle($name)}(##
+#foreach ($p in $message.getRequest().getFields())##
+#* *#${this.javaUnbox($p.schema())} ${this.mangle($p.name())}#if ($velocityHasNext), #end
+#end
+#if ($message.getRequest().getFields().size() > 0), #end
+org.apache.avro.ipc.Callback<${this.javaType($response)}> callback) throws java.io.IOException;
+#end## (generate callback method)
+#end## (requests)
+ }## End of Callback interface
+
+}## End of protocol interface
Added: avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/CallFuture.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/CallFuture.java?rev=1136342&view=auto
==============================================================================
--- avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/CallFuture.java (added)
+++ avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/CallFuture.java Thu Jun 16 09:33:29 2011
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.ipc;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * A Future implementation for RPCs.
+ */
+public class CallFuture<T> implements Future<T>, Callback<T> {
+ private final CountDownLatch latch = new CountDownLatch(1);
+ private final Callback<T> chainedCallback;
+ private T result = null;
+ private Throwable error = null;
+
+ /**
+ * Creates a CallFuture.
+ */
+ public CallFuture() {
+ this(null);
+ }
+
+ /**
+ * Creates a CallFuture with a chained Callback which will be invoked
+ * when this CallFuture's Callback methods are invoked.
+ * @param chainedCallback the chained Callback to set.
+ */
+ public CallFuture(Callback<T> chainedCallback) {
+ this.chainedCallback = chainedCallback;
+ }
+
+ /**
+ * Sets the RPC response, and unblocks all threads waiting on {@link #get()}
+ * or {@link #get(long, TimeUnit)}.
+ * @param result the RPC result to set.
+ */
+ @Override
+ public void handleResult(T result) {
+ this.result = result;
+ latch.countDown();
+ if (chainedCallback != null) {
+ chainedCallback.handleResult(result);
+ }
+ }
+
+ /**
+ * Sets an error thrown during RPC execution, and unblocks all threads waiting
+ * on {@link #get()} or {@link #get(long, TimeUnit)}.
+ * @param error the RPC error to set.
+ */
+ @Override
+ public void handleError(Throwable error) {
+ this.error = error;
+ latch.countDown();
+ if (chainedCallback != null) {
+ chainedCallback.handleError(error);
+ }
+ }
+
+ /**
+ * Gets the value of the RPC result without blocking.
+ * Using {@link #get()} or {@link #get(long, TimeUnit)} is usually
+ * preferred because these methods block until the result is available or
+ * an error occurs.
+ * @return the value of the response, or null if no result was returned or
+ * the RPC has not yet completed.
+ */
+ public T getResult() {
+ return result;
+ }
+
+ /**
+ * Gets the error that was thrown during RPC execution. Does not block.
+ * Either {@link #get()} or {@link #get(long, TimeUnit)} should be called
+ * first because these methods block until the RPC has completed.
+ * @return the RPC error that was thrown, or null if no error has occurred or
+ * if the RPC has not yet completed.
+ */
+ public Throwable getError() {
+ return error;
+ }
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ return false;
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return false;
+ }
+
+ @Override
+ public T get() throws InterruptedException,
+ ExecutionException {
+ latch.await();
+ if (error != null) {
+ throw new ExecutionException(error);
+ }
+ return result;
+ }
+
+ @Override
+ public T get(long timeout, TimeUnit unit)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ if (latch.await(timeout, unit)) {
+ if (error != null) {
+ throw new ExecutionException(error);
+ }
+ return result;
+ } else {
+ throw new TimeoutException();
+ }
+ }
+
+ /**
+ * Waits for the CallFuture to complete without returning the result.
+ * @throws InterruptedException if interrupted.
+ */
+ public void await() throws InterruptedException {
+ latch.await();
+ }
+
+ /**
+ * Waits for the CallFuture to complete without returning the result.
+ * @param timeout the maximum time to wait.
+ * @param unit the time unit of the timeout argument.
+ * @throws InterruptedException if interrupted.
+ * @throws TimeoutException if the wait timed out.
+ */
+ public void await(long timeout, TimeUnit unit)
+ throws InterruptedException, TimeoutException {
+ if (!latch.await(timeout, unit)) {
+ throw new TimeoutException();
+ }
+ }
+
+ @Override
+ public boolean isDone() {
+ return latch.getCount() <= 0;
+ }
+}
Added: avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/Callback.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/Callback.java?rev=1136342&view=auto
==============================================================================
--- avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/Callback.java (added)
+++ avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/Callback.java Thu Jun 16 09:33:29 2011
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.ipc;
+
+/**
+ * Interface for receiving asynchronous callbacks.
+ * For each request with an asynchronous callback,
+ * either {@link #handleResult(Object)} or {@link #handleError(Exception)}
+ * will be invoked.
+ */
+public interface Callback<T> {
+ /**
+ * Receives a callback result.
+ * @param result the result returned in the callback.
+ */
+ void handleResult(T result);
+
+ /**
+ * Receives an error.
+ * @param error the error returned in the callback.
+ */
+ void handleError(Throwable error);
+}
Modified: avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyServer.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyServer.java?rev=1136342&r1=1136341&r2=1136342&view=diff
==============================================================================
--- avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyServer.java (original)
+++ avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyServer.java Thu Jun 16 09:33:29 2011
@@ -54,13 +54,13 @@ public class NettyServer implements Serv
private static final Logger LOG = LoggerFactory.getLogger(NettyServer.class
.getName());
- private Responder responder;
+ private final Responder responder;
- private Channel serverChannel;
- private ChannelGroup allChannels = new DefaultChannelGroup(
+ private final Channel serverChannel;
+ private final ChannelGroup allChannels = new DefaultChannelGroup(
"avro-netty-server");
- private ChannelFactory channelFactory;
- private CountDownLatch closed = new CountDownLatch(1);
+ private final ChannelFactory channelFactory;
+ private final CountDownLatch closed = new CountDownLatch(1);
public NettyServer(Responder responder, InetSocketAddress addr) {
this.responder = responder;
@@ -140,10 +140,6 @@ public class NettyServer implements Serv
}
} catch (IOException ex) {
LOG.warn("unexpect error");
- } finally {
- if(!connectionMetadata.isConnected()){
- e.getChannel().close();
- }
}
}
Modified: avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java?rev=1136342&r1=1136341&r2=1136342&view=diff
==============================================================================
--- avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java (original)
+++ avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java Thu Jun 16 09:33:29 2011
@@ -27,10 +27,8 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.avro.Protocol;
@@ -45,6 +43,7 @@ import org.jboss.netty.channel.ChannelFu
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.ChannelState;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.ExceptionEvent;
@@ -61,26 +60,39 @@ public class NettyTransceiver extends Tr
private static final Logger LOG = LoggerFactory.getLogger(NettyTransceiver.class
.getName());
- private ChannelFactory channelFactory;
- private Channel channel;
+ private final AtomicInteger serialGenerator = new AtomicInteger(0);
+ private final Map<Integer, Callback<List<ByteBuffer>>> requests =
+ new ConcurrentHashMap<Integer, Callback<List<ByteBuffer>>>();
- private AtomicInteger serialGenerator = new AtomicInteger(0);
- private Map<Integer, CallFuture> requests =
- new ConcurrentHashMap<Integer, CallFuture>();
+ private final ChannelFactory channelFactory;
+ private final ClientBootstrap bootstrap;
+ private final InetSocketAddress remoteAddr;
- private Protocol remote;
+ /**
+ * Read lock must be acquired whenever using non-final state.
+ * Write lock must be acquired whenever modifying state.
+ */
+ private final ReadWriteLock stateLock = new ReentrantReadWriteLock();
+ private boolean open = false; // Synchronized on stateLock
+ private Channel channel; // Synchronized on stateLock
+ private Protocol remote; // Synchronized on stateLock
+
+ NettyTransceiver() {
+ channelFactory = null;
+ bootstrap = null;
+ remoteAddr = null;
+ }
- NettyTransceiver() {}
-
public NettyTransceiver(InetSocketAddress addr) {
- this(addr, new NioClientSocketChannelFactory(Executors.
- newCachedThreadPool(), Executors.newCachedThreadPool()));
+ this(addr, new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
+ Executors.newCachedThreadPool()));
}
public NettyTransceiver(InetSocketAddress addr, ChannelFactory channelFactory) {
// Set up.
this.channelFactory = channelFactory;
- ClientBootstrap bootstrap = new ClientBootstrap(channelFactory);
+ bootstrap = new ClientBootstrap(channelFactory);
+ remoteAddr = addr;
// Configure the event pipeline factory.
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@@ -97,54 +109,160 @@ public class NettyTransceiver extends Tr
bootstrap.setOption("tcpNoDelay", true);
// Make a new connection.
- ChannelFuture channelFuture = bootstrap.connect(addr);
- channelFuture.awaitUninterruptibly();
- if (!channelFuture.isSuccess()) {
- channelFuture.getCause().printStackTrace();
- throw new RuntimeException(channelFuture.getCause());
+ connect();
+ }
+
+ /**
+ * Connects to the remote peer if not already connected.
+ */
+ private void connect() {
+ stateLock.writeLock().lock();
+ try {
+ if (!open || (channel == null) || !channel.isOpen() || !channel.isBound() || !channel.isConnected()) {
+ LOG.info("Connecting to " + remoteAddr);
+ ChannelFuture channelFuture = bootstrap.connect(remoteAddr);
+ channelFuture.awaitUninterruptibly();
+ if (!channelFuture.isSuccess()) {
+ channelFuture.getCause().printStackTrace();
+ throw new RuntimeException(channelFuture.getCause());
+ }
+ channel = channelFuture.getChannel();
+ open = true;
+ }
+ } finally {
+ stateLock.writeLock().unlock();
+ }
+ }
+
+ /**
+ * Closes the connection to the remote peer if connected.
+ */
+ private void disconnect() {
+ disconnect(false);
+ }
+
+ /**
+ * Closes the connection to the remote peer if connected.
+ * @param awaitCompletion if true, will block until the close has completed.
+ */
+ private void disconnect(boolean awaitCompletion) {
+ stateLock.writeLock().lock();
+ try {
+ if (channel != null) {
+ LOG.info("Disconnecting from " + remoteAddr);
+ ChannelFuture closeFuture = channel.close();
+ if (awaitCompletion) {
+ closeFuture.awaitUninterruptibly();
+ }
+ channel = null;
+ remote = null;
+ open = false;
+ }
+ } finally {
+ stateLock.writeLock().unlock();
}
- channel = channelFuture.getChannel();
+ }
+
+ /**
+ * Netty channels are thread-safe, so there is no need to acquire locks.
+ * This method is a no-op.
+ */
+ @Override
+ public void lockChannel() {
+
+ }
+
+ /**
+ * Netty channels are thread-safe, so there is no need to acquire locks.
+ * This method is a no-op.
+ */
+ @Override
+ public void unlockChannel() {
+
}
public void close() {
- // Close the connection.
- channel.close().awaitUninterruptibly();
- // Shut down all thread pools to exit.
- channelFactory.releaseExternalResources();
+ stateLock.writeLock().lock();
+ try {
+ // Close the connection.
+ disconnect(true);
+ // Shut down all thread pools to exit.
+ channelFactory.releaseExternalResources();
+ } finally {
+ stateLock.writeLock().unlock();
+ }
}
@Override
public String getRemoteName() {
- return channel.getRemoteAddress().toString();
+ stateLock.readLock().lock();
+ try {
+ return channel.getRemoteAddress().toString();
+ } finally {
+ stateLock.readLock().unlock();
+ }
}
/**
* Override as non-synchronized method because the method is thread safe.
*/
@Override
- public List<ByteBuffer> transceive(List<ByteBuffer> request)
- throws IOException {
- int serial = serialGenerator.incrementAndGet();
- NettyDataPack dataPack = new NettyDataPack(serial, request);
- CallFuture callFuture = new CallFuture();
- requests.put(serial, callFuture);
- channel.write(dataPack);
+ public List<ByteBuffer> transceive(List<ByteBuffer> request) {
try {
- return callFuture.get();
+ CallFuture<List<ByteBuffer>> transceiverFuture = new CallFuture<List<ByteBuffer>>();
+ transceive(request, transceiverFuture);
+ return transceiverFuture.get();
} catch (InterruptedException e) {
LOG.warn("failed to get the response", e);
return null;
} catch (ExecutionException e) {
LOG.warn("failed to get the response", e);
return null;
+ }
+ }
+
+ @Override
+ public void transceive(List<ByteBuffer> request, Callback<List<ByteBuffer>> callback) {
+ stateLock.readLock().lock();
+ try {
+ int serial = serialGenerator.incrementAndGet();
+ NettyDataPack dataPack = new NettyDataPack(serial, request);
+ requests.put(serial, callback);
+ writeDataPack(dataPack);
} finally {
- requests.remove(serial);
+ stateLock.readLock().unlock();
}
}
-
+
@Override
public void writeBuffers(List<ByteBuffer> buffers) throws IOException {
- channel.write(new NettyDataPack(serialGenerator.incrementAndGet(), buffers));
+ writeDataPack(new NettyDataPack(serialGenerator.incrementAndGet(), buffers));
+ }
+
+ /**
+ * Writes a NettyDataPack, reconnecting to the remote peer if necessary.
+ * @param dataPack the data pack to write.
+ */
+ private void writeDataPack(NettyDataPack dataPack) {
+ stateLock.readLock().lock();
+ try {
+ while ((channel == null) || !channel.isOpen() || !channel.isBound() || !channel.isConnected()) {
+ // Need to reconnect
+ // Upgrade to write lock
+ stateLock.readLock().unlock();
+ stateLock.writeLock().lock();
+ try {
+ connect();
+ } finally {
+ // Downgrade to read lock:
+ stateLock.readLock().lock();
+ stateLock.writeLock().unlock();
+ }
+ }
+ channel.write(dataPack);
+ } finally {
+ stateLock.readLock().unlock();
+ }
}
@Override
@@ -154,71 +272,32 @@ public class NettyTransceiver extends Tr
@Override
public Protocol getRemote() {
- return remote;
+ stateLock.readLock().lock();
+ try {
+ return remote;
+ } finally {
+ stateLock.readLock().unlock();
+ }
}
@Override
public boolean isConnected() {
- return remote!=null;
+ stateLock.readLock().lock();
+ try {
+ return remote!=null;
+ } finally {
+ stateLock.readLock().unlock();
+ }
}
@Override
public void setRemote(Protocol protocol) {
- this.remote = protocol;
- }
-
- /**
- * Future class for a RPC call
- */
- class CallFuture implements Future<List<ByteBuffer>>{
- private Semaphore sem = new Semaphore(0);
- private List<ByteBuffer> response = null;
-
- public void setResponse(List<ByteBuffer> response) {
- this.response = response;
- sem.release();
- }
-
- public void releaseSemphore() {
- sem.release();
- }
-
- public List<ByteBuffer> getResponse() {
- return response;
- }
-
- @Override
- public boolean cancel(boolean mayInterruptIfRunning) {
- return false;
- }
-
- @Override
- public boolean isCancelled() {
- return false;
- }
-
- @Override
- public List<ByteBuffer> get() throws InterruptedException,
- ExecutionException {
- sem.acquire();
- return response;
- }
-
- @Override
- public List<ByteBuffer> get(long timeout, TimeUnit unit)
- throws InterruptedException, ExecutionException, TimeoutException {
- if (sem.tryAcquire(timeout, unit)) {
- return response;
- } else {
- throw new TimeoutException();
- }
- }
-
- @Override
- public boolean isDone() {
- return sem.availablePermits()>0;
+ stateLock.writeLock().lock();
+ try {
+ this.remote = protocol;
+ } finally {
+ stateLock.writeLock().unlock();
}
-
}
/**
@@ -231,6 +310,33 @@ public class NettyTransceiver extends Tr
throws Exception {
if (e instanceof ChannelStateEvent) {
LOG.info(e.toString());
+ ChannelStateEvent cse = (ChannelStateEvent)e;
+ if ((cse.getState() == ChannelState.OPEN) && (Boolean.FALSE.equals(cse.getValue()))) {
+ // Server closed connection; disconnect client side
+ LOG.info("Remote peer " + remoteAddr + " closed connection.");
+ stateLock.readLock().lock();
+ boolean readLockAcquired = true;
+ try {
+ // Only disconnect if open to prevent deadlock on close()
+ if (open) {
+ // Upgrade to write lock:
+ stateLock.readLock().unlock();
+ readLockAcquired = false;
+ stateLock.writeLock().lock();
+ try {
+ if (open) {
+ disconnect();
+ }
+ } finally {
+ stateLock.writeLock().unlock();
+ }
+ }
+ } finally {
+ if (readLockAcquired) {
+ stateLock.readLock().unlock();
+ }
+ }
+ }
}
super.handleUpstream(ctx, e);
}
@@ -245,11 +351,15 @@ public class NettyTransceiver extends Tr
@Override
public void messageReceived(ChannelHandlerContext ctx, final MessageEvent e) {
NettyDataPack dataPack = (NettyDataPack)e.getMessage();
- CallFuture callFuture = requests.get(dataPack.getSerial());
- if (callFuture==null) {
+ Callback<List<ByteBuffer>> callback = requests.get(dataPack.getSerial());
+ if (callback==null) {
throw new RuntimeException("Missing previous call info");
}
- callFuture.setResponse(dataPack.getDatas());
+ try {
+ callback.handleResult(dataPack.getDatas());
+ } finally {
+ requests.remove(dataPack.getSerial());
+ }
}
@Override
@@ -257,9 +367,9 @@ public class NettyTransceiver extends Tr
LOG.warn("Unexpected exception from downstream.", e.getCause());
e.getChannel().close();
// let the blocking waiting exit
- Iterator<CallFuture> it = requests.values().iterator();
+ Iterator<Callback<List<ByteBuffer>>> it = requests.values().iterator();
while(it.hasNext()) {
- it.next().releaseSemphore();
+ it.next().handleError(e.getCause());
it.remove();
}
Modified: avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/Requestor.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/Requestor.java?rev=1136342&r1=1136341&r2=1136342&view=diff
==============================================================================
--- avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/Requestor.java (original)
+++ avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/Requestor.java Thu Jun 16 09:33:29 2011
@@ -20,9 +20,11 @@ package org.apache.avro.ipc;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.List;
import java.util.Map;
@@ -57,12 +59,13 @@ public abstract class Requestor {
private static final GenericDatumWriter<Map<CharSequence,ByteBuffer>>
META_WRITER = new GenericDatumWriter<Map<CharSequence,ByteBuffer>>(META);
- private Protocol local;
- private Protocol remote;
- private boolean sendLocalText;
- private Transceiver transceiver;
+ private final Protocol local;
+ private volatile Protocol remote;
+ private volatile boolean sendLocalText;
+ private final Transceiver transceiver;
+ private final ReentrantLock handshakeLock = new ReentrantLock();
- protected List<RPCPlugin> rpcMetaPlugins;
+ protected final List<RPCPlugin> rpcMetaPlugins;
public Protocol getLocal() { return local; }
public Transceiver getTransceiver() { return transceiver; }
@@ -72,7 +75,7 @@ public abstract class Requestor {
this.local = local;
this.transceiver = transceiver;
this.rpcMetaPlugins =
- Collections.synchronizedList(new ArrayList<RPCPlugin>());
+ new CopyOnWriteArrayList<RPCPlugin>();
}
/**
@@ -85,93 +88,96 @@ public abstract class Requestor {
}
private static final EncoderFactory ENCODER_FACTORY = new EncoderFactory();
- private BinaryEncoder encoder =
- ENCODER_FACTORY.binaryEncoder(new ByteBufferOutputStream(), null);
/** Writes a request message and reads a response or error message. */
- public synchronized Object request(String messageName, Object request)
+ public Object request(String messageName, Object request)
throws Exception {
- Transceiver t = getTransceiver();
- BinaryDecoder in = null;
- Message m;
- RPCContext context = new RPCContext();
- do {
- ByteBufferOutputStream bbo = new ByteBufferOutputStream();
- //safe to use encoder because this is synchronized
- BinaryEncoder out = ENCODER_FACTORY.binaryEncoder(bbo, encoder);
-
- // use local protocol to write request
- m = getLocal().getMessages().get(messageName);
- if (m == null)
- throw new AvroRuntimeException("Not a local message: "+messageName);
- context.setMessage(m);
+ // Initialize request
+ Request rpcRequest = new Request(messageName, request, new RPCContext());
+ CallFuture<Object> future = /* only need a Future for two-way messages */
+ rpcRequest.getMessage().isOneWay() ? null : new CallFuture<Object>();
- writeRequest(m.getRequest(), request, out); // write request payload
-
- out.flush();
- List<ByteBuffer> payload = bbo.getBufferList();
-
- writeHandshake(out); // prepend handshake if needed
-
- context.setRequestPayload(payload);
- for (RPCPlugin plugin : rpcMetaPlugins) {
- plugin.clientSendRequest(context); // get meta-data from plugins
+ // Send request
+ request(rpcRequest, future);
+
+ if (future == null) // the message is one-way, so return immediately
+ return null;
+ try { // the message is two-way, wait for the result
+ return future.get();
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof Exception) {
+ throw (Exception)e.getCause();
+ } else {
+ throw new RuntimeException(e.getCause());
}
- META_WRITER.write(context.requestCallMeta(), out);
-
- out.writeString(m.getName()); // write message name
-
- out.flush();
- bbo.append(payload);
-
- List<ByteBuffer> requestBytes = bbo.getBufferList();
-
- if (m.isOneWay() && t.isConnected()) { // send one-way message
- t.writeBuffers(requestBytes);
-
- return null;
- } else { // two-way message
- List<ByteBuffer> response = t.transceive(requestBytes);
- ByteBufferInputStream bbi = new ByteBufferInputStream(response);
- in = DecoderFactory.get().binaryDecoder(bbi, in);
+ }
+ }
+
+ /**
+ * Writes a request message and returns the result through a Callback.
+ * Clients can also use a Future interface by creating a new CallFuture<T>,
+ * passing it in as the Callback parameter, and then waiting on that Future.
+ * @param <T> the return type of the message.
+ * @param messageName the name of the message to invoke.
+ * @param request the request data to send.
+ * @param callback the callback which will be invoked when the response is returned
+ * or an error occurs.
+ * @throws Exception if an error occurs sending the message.
+ */
+ public <T> void request(String messageName, Object request, Callback<T> callback)
+ throws Exception {
+ request(new Request(messageName, request, new RPCContext()), callback);
+ }
+
+ /** Writes a request message and returns the result through a Callback. */
+ <T> void request(Request request, Callback<T> callback)
+ throws Exception {
+ Transceiver t = getTransceiver();
+ if (!t.isConnected()) {
+ // Acquire handshake lock so that only one thread is performing the
+ // handshake and other threads block until the handshake is completed
+ handshakeLock.lock();
+ try {
+ if (t.isConnected()) {
+ // Another thread already completed the handshake; no need to hold
+ // the write lock
+ handshakeLock.unlock();
+ } else {
+ CallFuture<T> callFuture = new CallFuture<T>(callback);
+ t.transceive(request.getBytes(),
+ new TransceiverCallback<T>(request, callFuture));
+ // Block until handshake complete
+ callFuture.await();
+ return;
+ }
+ } finally{
+ if (handshakeLock.isHeldByCurrentThread()) {
+ handshakeLock.unlock();
+ }
}
- } while (!readHandshake(in));
-
- // use remote protocol to read response
- Message rm = remote.getMessages().get(messageName);
- if (rm == null)
- throw new AvroRuntimeException("Not a remote message: "+messageName);
-
- if ((m.isOneWay() != rm.isOneWay()) && t.isConnected())
- throw new AvroRuntimeException("Not both one-way messages: "+messageName);
-
- if (m.isOneWay() && t.isConnected()) return null; // one-way w/ handshake
-
- context.setResponseCallMeta(META_READER.read(null, in));
-
- if (!in.readBoolean()) { // no error
- Object response = readResponse(rm.getResponse(), m.getResponse(), in);
- context.setResponse(response);
- for (RPCPlugin plugin : rpcMetaPlugins) {
- plugin.clientReceiveResponse(context);
+ }
+
+ if (request.getMessage().isOneWay()) {
+ t.lockChannel();
+ try {
+ t.writeBuffers(request.getBytes());
+ if (callback != null) {
+ callback.handleResult(null);
+ }
+ } finally {
+ t.unlockChannel();
}
- return response;
-
} else {
- Exception error = readError(rm.getErrors(), m.getErrors(), in);
- context.setError(error);
- for (RPCPlugin plugin : rpcMetaPlugins) {
- plugin.clientReceiveResponse(context);
- }
- throw error;
+ t.transceive(request.getBytes(),
+ new TransceiverCallback<T>(request, callback));
}
}
- private static final Map<String,MD5> REMOTE_HASHES =
- Collections.synchronizedMap(new HashMap<String,MD5>());
- private static final Map<MD5,Protocol> REMOTE_PROTOCOLS =
- Collections.synchronizedMap(new HashMap<MD5,Protocol>());
+ private static final ConcurrentMap<String,MD5> REMOTE_HASHES =
+ new ConcurrentHashMap<String,MD5>();
+ private static final ConcurrentMap<MD5,Protocol> REMOTE_PROTOCOLS =
+ new ConcurrentHashMap<MD5,Protocol>();
private static final SpecificDatumWriter<HandshakeRequest> HANDSHAKE_WRITER =
new SpecificDatumWriter<HandshakeRequest>(HandshakeRequest.class);
@@ -185,10 +191,11 @@ public abstract class Requestor {
localHash.bytes(local.getMD5());
String remoteName = transceiver.getRemoteName();
MD5 remoteHash = REMOTE_HASHES.get(remoteName);
- remote = REMOTE_PROTOCOLS.get(remoteHash);
if (remoteHash == null) { // guess remote is local
remoteHash = localHash;
remote = local;
+ } else {
+ remote = REMOTE_PROTOCOLS.get(remoteHash);
}
HandshakeRequest handshake = new HandshakeRequest();
handshake.clientHash = localHash;
@@ -244,30 +251,36 @@ public abstract class Requestor {
remote = Protocol.parse(handshake.serverProtocol.toString());
MD5 remoteHash = (MD5)handshake.serverHash;
REMOTE_HASHES.put(transceiver.getRemoteName(), remoteHash);
- if (!REMOTE_PROTOCOLS.containsKey(remoteHash))
- REMOTE_PROTOCOLS.put(remoteHash, remote);
+ REMOTE_PROTOCOLS.putIfAbsent(remoteHash, remote);
}
/** Return the remote protocol. Force a handshake if required. */
- public synchronized Protocol getRemote() throws IOException {
+ public Protocol getRemote() throws IOException {
if (remote != null) return remote; // already have it
MD5 remoteHash = REMOTE_HASHES.get(transceiver.getRemoteName());
- remote = REMOTE_PROTOCOLS.get(remoteHash);
- if (remote != null) return remote; // already cached
- // force handshake
- ByteBufferOutputStream bbo = new ByteBufferOutputStream();
- // direct because the payload is tiny.
- Encoder out = ENCODER_FACTORY.directBinaryEncoder(bbo, null);
- writeHandshake(out);
- out.writeInt(0); // empty metadata
- out.writeString(""); // bogus message name
- List<ByteBuffer> response =
- getTransceiver().transceive(bbo.getBufferList());
- ByteBufferInputStream bbi = new ByteBufferInputStream(response);
- BinaryDecoder in =
- DecoderFactory.get().binaryDecoder(bbi, null);
- readHandshake(in);
- return this.remote;
+ if (remoteHash != null) {
+ remote = REMOTE_PROTOCOLS.get(remoteHash);
+ if (remote != null) return remote; // already cached
+ }
+ handshakeLock.lock();
+ try {
+ // force handshake
+ ByteBufferOutputStream bbo = new ByteBufferOutputStream();
+ // direct because the payload is tiny.
+ Encoder out = ENCODER_FACTORY.directBinaryEncoder(bbo, null);
+ writeHandshake(out);
+ out.writeInt(0); // empty metadata
+ out.writeString(""); // bogus message name
+ List<ByteBuffer> response =
+ getTransceiver().transceive(bbo.getBufferList());
+ ByteBufferInputStream bbi = new ByteBufferInputStream(response);
+ BinaryDecoder in =
+ DecoderFactory.get().binaryDecoder(bbi, null);
+ readHandshake(in);
+ return this.remote;
+ } finally {
+ handshakeLock.unlock();
+ }
}
@@ -292,5 +305,249 @@ public abstract class Requestor {
/** Reads an error message. */
public abstract Exception readError(Schema writer, Schema reader, Decoder in)
throws IOException;
-}
+
+ /**
+ * Handles callbacks from transceiver invocations.
+ */
+ protected class TransceiverCallback<T> implements Callback<List<ByteBuffer>> {
+ private final Request request;
+ private final Callback<T> callback;
+
+ /**
+ * Creates a TransceiverCallback.
+ * @param request the request to set.
+ * @param callback the callback to set.
+ */
+ public TransceiverCallback(Request request, Callback<T> callback) {
+ this.request = request;
+ this.callback = callback;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void handleResult(List<ByteBuffer> responseBytes) {
+ ByteBufferInputStream bbi = new ByteBufferInputStream(responseBytes);
+ BinaryDecoder in = DecoderFactory.get().binaryDecoder(bbi, null);
+ try {
+ if (!readHandshake(in)) {
+ // Resend the handshake and return
+ Request handshake = new Request(request);
+ getTransceiver().transceive
+ (handshake.getBytes(),
+ new TransceiverCallback<T>(handshake, callback));
+ return;
+ }
+ } catch (Exception e) {
+ LOG.error("Error handling transceiver callback: " + e, e);
+ }
+
+ // Read response; invoke callback
+ Response response = new Response(request, in);
+ Object responseObject;
+ try {
+ try {
+ responseObject = response.getResponse();
+ } catch (Exception e) {
+ if (callback != null) {
+ callback.handleError(e);
+ }
+ return;
+ }
+ if (callback != null) {
+ callback.handleResult((T)responseObject);
+ }
+ } catch (Throwable t) {
+ LOG.error("Error in callback handler: " + t, t);
+ }
+ }
+
+ @Override
+ public void handleError(Throwable error) {
+ callback.handleError(error);
+ }
+ }
+
+ /**
+ * Encapsulates/generates a request.
+ */
+ class Request {
+ private final String messageName;
+ private final Object request;
+ private final RPCContext context;
+ private final BinaryEncoder encoder;
+ private Message message;
+ private List<ByteBuffer> requestBytes;
+
+ /**
+ * Creates a Request.
+ * @param messageName the name of the message to invoke.
+ * @param request the request data to send.
+ * @param context the RPC context to use.
+ */
+ public Request(String messageName, Object request, RPCContext context) {
+ this(messageName, request, context, null);
+ }
+
+ /**
+ * Creates a Request.
+ * @param messageName the name of the message to invoke.
+ * @param request the request data to send.
+ * @param context the RPC context to use.
+ * @param encoder the BinaryEncoder to use to serialize the request.
+ */
+ public Request(String messageName, Object request, RPCContext context,
+ BinaryEncoder encoder) {
+ this.messageName = messageName;
+ this.request = request;
+ this.context = context;
+ this.encoder =
+ ENCODER_FACTORY.binaryEncoder(new ByteBufferOutputStream(), encoder);
+ }
+
+ /**
+ * Copy constructor.
+ * @param other Request from which to copy fields.
+ */
+ public Request(Request other) {
+ this.messageName = other.messageName;
+ this.request = other.request;
+ this.context = other.context;
+ this.encoder = other.encoder;
+ }
+
+ /**
+ * Gets the message name.
+ * @return the message name.
+ */
+ public String getMessageName() {
+ return messageName;
+ }
+
+ /**
+ * Gets the RPC context.
+ * @return the RPC context.
+ */
+ public RPCContext getContext() {
+ return context;
+ }
+
+ /**
+ * Gets the Message associated with this request.
+ * @return this request's message.
+ */
+ public Message getMessage() {
+ if (message == null) {
+ message = getLocal().getMessages().get(messageName);
+ if (message == null) {
+ throw new AvroRuntimeException("Not a local message: "+messageName);
+ }
+ }
+ return message;
+ }
+
+ /**
+ * Gets the request data, generating it first if necessary.
+ * @return the request data.
+ * @throws Exception if an error occurs generating the request data.
+ */
+ public List<ByteBuffer> getBytes()
+ throws Exception {
+ if (requestBytes == null) {
+ ByteBufferOutputStream bbo = new ByteBufferOutputStream();
+ BinaryEncoder out = ENCODER_FACTORY.binaryEncoder(bbo, encoder);
+
+ // use local protocol to write request
+ Message m = getMessage();
+ context.setMessage(m);
+
+ writeRequest(m.getRequest(), request, out); // write request payload
+
+ out.flush();
+ List<ByteBuffer> payload = bbo.getBufferList();
+
+ writeHandshake(out); // prepend handshake if needed
+
+ context.setRequestPayload(payload);
+ for (RPCPlugin plugin : rpcMetaPlugins) {
+ plugin.clientSendRequest(context); // get meta-data from plugins
+ }
+ META_WRITER.write(context.requestCallMeta(), out);
+ out.writeString(m.getName()); // write message name
+
+ out.flush();
+ bbo.append(payload);
+
+ requestBytes = bbo.getBufferList();
+ }
+ return requestBytes;
+ }
+ }
+
+ /**
+ * Encapsulates/parses a response.
+ */
+ class Response {
+ private final Request request;
+ private final BinaryDecoder in;
+
+ /**
+ * Creates a Response.
+ * @param request the Request associated with this response.
+ */
+ public Response(Request request) {
+ this(request, null);
+ }
+
+ /**
+ * Creates a Creates a Response.
+ * @param request the Request associated with this response.
+ * @param in the BinaryDecoder to use to deserialize the response.
+ */
+ public Response(Request request, BinaryDecoder in) {
+ this.request = request;
+ this.in = in;
+ }
+
+ /**
+ * Gets the RPC response, reading/deserializing it first if necessary.
+ * @return the RPC response.
+ * @throws Exception if an error occurs reading/deserializing the response.
+ */
+ public Object getResponse()
+ throws Exception {
+ Message lm = request.getMessage();
+ Message rm = remote.getMessages().get(request.getMessageName());
+ if (rm == null)
+ throw new AvroRuntimeException
+ ("Not a remote message: "+request.getMessageName());
+
+ Transceiver t = getTransceiver();
+ if ((lm.isOneWay() != rm.isOneWay()) && t.isConnected())
+ throw new AvroRuntimeException
+ ("Not both one-way messages: "+request.getMessageName());
+
+ if (lm.isOneWay() && t.isConnected()) return null; // one-way w/ handshake
+
+ RPCContext context = request.getContext();
+ context.setResponseCallMeta(META_READER.read(null, in));
+
+ if (!in.readBoolean()) { // no error
+ Object response = readResponse(rm.getResponse(), lm.getResponse(), in);
+ context.setResponse(response);
+ for (RPCPlugin plugin : rpcMetaPlugins) {
+ plugin.clientReceiveResponse(context);
+ }
+ return response;
+
+ } else {
+ Exception error = readError(rm.getErrors(), lm.getErrors(), in);
+ context.setError(error);
+ for (RPCPlugin plugin : rpcMetaPlugins) {
+ plugin.clientReceiveResponse(context);
+ }
+ throw error;
+ }
+ }
+ }
+}
Modified: avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/Responder.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/Responder.java?rev=1136342&r1=1136341&r2=1136342&view=diff
==============================================================================
--- avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/Responder.java (original)
+++ avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/Responder.java Thu Jun 16 09:33:29 2011
@@ -21,9 +21,8 @@ package org.apache.avro.ipc;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.List;
import java.util.Map;
@@ -62,12 +61,12 @@ public abstract class Responder {
private static final ThreadLocal<Protocol> REMOTE =
new ThreadLocal<Protocol>();
- private Map<MD5,Protocol> protocols
- = Collections.synchronizedMap(new HashMap<MD5,Protocol>());
+ private final Map<MD5,Protocol> protocols
+ = new ConcurrentHashMap<MD5,Protocol>();
- private Protocol local;
- private MD5 localHash;
- protected List<RPCPlugin> rpcMetaPlugins;
+ private final Protocol local;
+ private final MD5 localHash;
+ protected final List<RPCPlugin> rpcMetaPlugins;
protected Responder(Protocol local) {
this.local = local;
@@ -75,7 +74,7 @@ public abstract class Responder {
localHash.bytes(local.getMD5());
protocols.put(localHash, local);
this.rpcMetaPlugins =
- Collections.synchronizedList(new ArrayList<RPCPlugin>());
+ new CopyOnWriteArrayList<RPCPlugin>();
}
/** Return the remote protocol. Accesses a {@link ThreadLocal} that's set
Modified: avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/Transceiver.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/Transceiver.java?rev=1136342&r1=1136341&r2=1136342&view=diff
==============================================================================
--- avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/Transceiver.java (original)
+++ avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/Transceiver.java Thu Jun 16 09:33:29 2011
@@ -22,21 +22,58 @@ import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.avro.Protocol;
/** Base transport class used by {@link Requestor}. */
public abstract class Transceiver implements Closeable {
+ private final ReentrantLock channelLock = new ReentrantLock();
public abstract String getRemoteName();
+
+ /**
+ * Acquires an exclusive lock on the transceiver's channel.
+ */
+ public void lockChannel() {
+ channelLock.lock();
+ }
+
+ /**
+ * Releases the lock on the transceiver's channel if held by the calling thread.
+ */
+ public void unlockChannel() {
+ if (channelLock.isHeldByCurrentThread()) {
+ channelLock.unlock();
+ }
+ }
/** Called by {@link Requestor#request(String,Object)} for two-way messages.
* By default calls {@link #writeBuffers(List)} followed by
* {@link #readBuffers()}. */
- public synchronized List<ByteBuffer> transceive(List<ByteBuffer> request)
+ public List<ByteBuffer> transceive(List<ByteBuffer> request)
+ throws IOException {
+ lockChannel();
+ try {
+ writeBuffers(request);
+ return readBuffers();
+ } finally {
+ unlockChannel();
+ }
+ }
+
+ /**
+ * Called by {@link Requestor#request(String,Object,Callback)} for two-way messages using callbacks.
+ */
+ public void transceive(List<ByteBuffer> request, Callback<List<ByteBuffer>> callback)
throws IOException {
- writeBuffers(request);
- return readBuffers();
+ // The default implementation works synchronously
+ try {
+ List<ByteBuffer> response = transceive(request);
+ callback.handleResult(response);
+ } catch (IOException e) {
+ callback.handleError(e);
+ }
}
/** Called by the default definition of {@link #transceive(List)}.*/
Modified: avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/generic/GenericRequestor.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/generic/GenericRequestor.java?rev=1136342&r1=1136341&r2=1136342&view=diff
==============================================================================
--- avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/generic/GenericRequestor.java (original)
+++ avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/generic/GenericRequestor.java Thu Jun 16 09:33:29 2011
@@ -28,6 +28,7 @@ import org.apache.avro.generic.GenericDa
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.Encoder;
+import org.apache.avro.ipc.Callback;
import org.apache.avro.ipc.Requestor;
import org.apache.avro.ipc.Transceiver;
@@ -53,6 +54,20 @@ public class GenericRequestor extends Re
}
@Override
+ public <T> void request(String messageName, Object request, Callback<T> callback)
+ throws IOException {
+ try {
+ super.request(messageName, request, callback);
+ } catch (Exception e) {
+ if (e instanceof RuntimeException)
+ throw (RuntimeException)e;
+ if (e instanceof IOException)
+ throw (IOException)e;
+ throw new AvroRemoteException(e);
+ }
+ }
+
+ @Override
public void writeRequest(Schema schema, Object request, Encoder out)
throws IOException {
new GenericDatumWriter<Object>(schema).write(request, out);
Modified: avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/specific/SpecificRequestor.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/specific/SpecificRequestor.java?rev=1136342&r1=1136341&r2=1136342&view=diff
==============================================================================
--- avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/specific/SpecificRequestor.java (original)
+++ avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/specific/SpecificRequestor.java Thu Jun 16 09:33:29 2011
@@ -22,6 +22,8 @@ import java.io.IOException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Type;
+import java.util.Arrays;
import org.apache.avro.Protocol;
import org.apache.avro.Schema;
@@ -32,6 +34,7 @@ import org.apache.avro.io.Decoder;
import org.apache.avro.io.Encoder;
import org.apache.avro.ipc.Transceiver;
import org.apache.avro.ipc.Requestor;
+import org.apache.avro.ipc.Callback;
import org.apache.avro.specific.SpecificData;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
@@ -52,7 +55,20 @@ public class SpecificRequestor extends R
@Override
public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable {
- return request(method.getName(), args);
+ // Check if this is a callback-based RPC:
+ Type[] parameterTypes = method.getParameterTypes();
+ if ((parameterTypes.length > 0) &&
+ (parameterTypes[parameterTypes.length - 1] instanceof Class) &&
+ Callback.class.isAssignableFrom(((Class<?>)parameterTypes[parameterTypes.length - 1]))) {
+ // Extract the Callback from the end of of the argument list
+ Object[] finalArgs = Arrays.copyOf(args, args.length - 1);
+ Callback<?> callback = (Callback<?>)args[args.length - 1];
+ request(method.getName(), finalArgs, callback);
+ return null;
+ }
+ else {
+ return request(method.getName(), args);
+ }
}
protected DatumWriter<Object> getDatumWriter(Schema schema) {
Added: avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/TestProtocolNetty.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/TestProtocolNetty.java?rev=1136342&view=auto
==============================================================================
--- avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/TestProtocolNetty.java (added)
+++ avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/TestProtocolNetty.java Thu Jun 16 09:33:29 2011
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro;
+
+import java.net.InetSocketAddress;
+
+import org.apache.avro.ipc.NettyServer;
+import org.apache.avro.ipc.NettyTransceiver;
+import org.apache.avro.ipc.Responder;
+import org.apache.avro.ipc.Server;
+import org.apache.avro.ipc.Transceiver;
+
+/**
+ * Protocol test with Netty server and transceiver
+ */
+public class TestProtocolNetty extends TestProtocolSpecific {
+ @Override
+ public Server createServer(Responder testResponder) throws Exception {
+ return new NettyServer(responder, new InetSocketAddress(0));
+ }
+
+ @Override
+ public Transceiver createTransceiver() throws Exception{
+ return new NettyTransceiver(new InetSocketAddress(server.getPort()));
+ }
+
+ @Override
+ protected int getExpectedHandshakeCount() {
+ return REPEATING;
+ }
+}
Added: avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithCallbacks.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithCallbacks.java?rev=1136342&view=auto
==============================================================================
--- avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithCallbacks.java (added)
+++ avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithCallbacks.java Thu Jun 16 09:33:29 2011
@@ -0,0 +1,345 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.ipc;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.avro.AvroRemoteException;
+import org.apache.avro.ipc.specific.SpecificRequestor;
+import org.apache.avro.ipc.specific.SpecificResponder;
+import org.apache.avro.test.Simple;
+import org.apache.avro.test.TestError;
+import org.apache.avro.test.TestRecord;
+import org.apache.avro.util.Utf8;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ * Tests asynchronous RPCs with Netty.
+ */
+public class TestNettyServerWithCallbacks {
+ private static Server server;
+ private static Transceiver transceiver;
+ private static Simple.Callback simpleClient;
+ private static final AtomicBoolean ackFlag = new AtomicBoolean(false);
+ private static final AtomicReference<CountDownLatch> ackLatch =
+ new AtomicReference<CountDownLatch>(new CountDownLatch(1));
+ private static Simple simpleService = new SimpleImpl(ackFlag);
+
+ @BeforeClass
+ public static void initializeConnections() throws Exception {
+ // start server
+ Responder responder = new SpecificResponder(Simple.class, simpleService);
+ server = new NettyServer(responder, new InetSocketAddress(0));
+ server.start();
+
+ int serverPort = server.getPort();
+ System.out.println("server port : " + serverPort);
+
+ transceiver = new NettyTransceiver(new InetSocketAddress(
+ serverPort));
+ simpleClient = SpecificRequestor.getClient(Simple.Callback.class, transceiver);
+ }
+
+ @AfterClass
+ public static void tearDownConnections() throws Exception {
+ if (transceiver != null) {
+ transceiver.close();
+ }
+ if (server != null) {
+ server.close();
+ }
+ }
+
+ @Test
+ public void greeting() throws Exception {
+ // Test synchronous RPC:
+ Assert.assertEquals(new Utf8("Hello, how are you?"), simpleClient.hello("how are you?"));
+
+ // Test asynchronous RPC (future):
+ CallFuture<CharSequence> future1 = new CallFuture<CharSequence>();
+ simpleClient.hello("World!", future1);
+ Assert.assertEquals(new Utf8("Hello, World!"), future1.get(2, TimeUnit.SECONDS));
+ Assert.assertNull(future1.getError());
+
+ // Test asynchronous RPC (callback):
+ final CallFuture<CharSequence> future2 = new CallFuture<CharSequence>();
+ simpleClient.hello("what's up?", new Callback<CharSequence>() {
+ @Override
+ public void handleResult(CharSequence result) {
+ future2.handleResult(result);
+ }
+ @Override
+ public void handleError(Throwable error) {
+ future2.handleError(error);
+ }
+ });
+ Assert.assertEquals(new Utf8("Hello, what's up?"), future2.get(2, TimeUnit.SECONDS));
+ Assert.assertNull(future2.getError());
+ }
+
+ @Test
+ public void echo() throws Exception {
+ TestRecord record = new TestRecord();
+ record.hash = new org.apache.avro.test.MD5();
+ record.hash.bytes(new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8 } );
+ record.kind = org.apache.avro.test.Kind.FOO;
+ record.name = "My Record";
+
+ // Test synchronous RPC:
+ Assert.assertEquals(record, simpleClient.echo(record));
+
+ // Test asynchronous RPC (future):
+ CallFuture<TestRecord> future1 = new CallFuture<TestRecord>();
+ simpleClient.echo(record, future1);
+ Assert.assertEquals(record, future1.get(2, TimeUnit.SECONDS));
+ Assert.assertNull(future1.getError());
+
+ // Test asynchronous RPC (callback):
+ final CallFuture<TestRecord> future2 = new CallFuture<TestRecord>();
+ simpleClient.echo(record, new Callback<TestRecord>() {
+ @Override
+ public void handleResult(TestRecord result) {
+ future2.handleResult(result);
+ }
+ @Override
+ public void handleError(Throwable error) {
+ future2.handleError(error);
+ }
+ });
+ Assert.assertEquals(record, future2.get(2, TimeUnit.SECONDS));
+ Assert.assertNull(future2.getError());
+ }
+
+ @Test
+ public void add() throws Exception {
+ // Test synchronous RPC:
+ Assert.assertEquals(8, simpleClient.add(2, 6));
+
+ // Test asynchronous RPC (future):
+ CallFuture<Integer> future1 = new CallFuture<Integer>();
+ simpleClient.add(8, 8, future1);
+ Assert.assertEquals(new Integer(16), future1.get(2, TimeUnit.SECONDS));
+ Assert.assertNull(future1.getError());
+
+ // Test asynchronous RPC (callback):
+ final CallFuture<Integer> future2 = new CallFuture<Integer>();
+ simpleClient.add(512, 256, new Callback<Integer>() {
+ @Override
+ public void handleResult(Integer result) {
+ future2.handleResult(result);
+ }
+ @Override
+ public void handleError(Throwable error) {
+ future2.handleError(error);
+ }
+ });
+ Assert.assertEquals(new Integer(768), future2.get(2, TimeUnit.SECONDS));
+ Assert.assertNull(future2.getError());
+ }
+
+ @Test
+ public void echoBytes() throws Exception {
+ ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[] { 1, 2, 3, 4, 5, 6, 7, 8 });
+
+ // Test synchronous RPC:
+ Assert.assertEquals(byteBuffer, simpleClient.echoBytes(byteBuffer));
+
+ // Test asynchronous RPC (future):
+ CallFuture<ByteBuffer> future1 = new CallFuture<ByteBuffer>();
+ simpleClient.echoBytes(byteBuffer, future1);
+ Assert.assertEquals(byteBuffer, future1.get(2, TimeUnit.SECONDS));
+ Assert.assertNull(future1.getError());
+
+ // Test asynchronous RPC (callback):
+ final CallFuture<ByteBuffer> future2 = new CallFuture<ByteBuffer>();
+ simpleClient.echoBytes(byteBuffer, new Callback<ByteBuffer>() {
+ @Override
+ public void handleResult(ByteBuffer result) {
+ future2.handleResult(result);
+ }
+ @Override
+ public void handleError(Throwable error) {
+ future2.handleError(error);
+ }
+ });
+ Assert.assertEquals(byteBuffer, future2.get(2, TimeUnit.SECONDS));
+ Assert.assertNull(future2.getError());
+ }
+
+ @Test()
+ public void error() throws IOException, InterruptedException, TimeoutException {
+ // Test synchronous RPC:
+ try {
+ simpleClient.error();
+ Assert.fail("Expected " + TestError.class.getCanonicalName());
+ } catch (TestError e) {
+ // Expected
+ } catch (AvroRemoteException e) {
+ e.printStackTrace();
+ Assert.fail("Unexpected error: " + e.toString());
+ }
+
+ // Test asynchronous RPC (future):
+ CallFuture<Void> future = new CallFuture<Void>();
+ simpleClient.error(future);
+ try {
+ future.get(2, TimeUnit.SECONDS);
+ Assert.fail("Expected " + TestError.class.getCanonicalName() + " to be thrown");
+ } catch (ExecutionException e) {
+ Assert.assertTrue("Expected " + TestError.class.getCanonicalName(),
+ e.getCause() instanceof TestError);
+ }
+ Assert.assertNotNull(future.getError());
+ Assert.assertTrue("Expected " + TestError.class.getCanonicalName(),
+ future.getError() instanceof TestError);
+ Assert.assertNull(future.getResult());
+
+ // Test asynchronous RPC (callback):
+ final CountDownLatch latch = new CountDownLatch(1);
+ final AtomicReference<Throwable> errorRef = new AtomicReference<Throwable>();
+ simpleClient.error(new Callback<Void>() {
+ @Override
+ public void handleResult(Void result) {
+ Assert.fail("Expected " + TestError.class.getCanonicalName());
+ }
+ @Override
+ public void handleError(Throwable error) {
+ errorRef.set(error);
+ latch.countDown();
+ }
+ });
+ Assert.assertTrue("Timed out waiting for error", latch.await(2, TimeUnit.SECONDS));
+ Assert.assertNotNull(errorRef.get());
+ Assert.assertTrue(errorRef.get() instanceof TestError);
+ }
+
+ @Test
+ public void ack() throws Exception {
+ simpleClient.ack();
+ ackLatch.get().await(2, TimeUnit.SECONDS);
+ Assert.assertTrue("Expected ack flag to be set", ackFlag.get());
+
+ ackLatch.set(new CountDownLatch(1));
+ simpleClient.ack();
+ ackLatch.get().await(2, TimeUnit.SECONDS);
+ Assert.assertFalse("Expected ack flag to be cleared", ackFlag.get());
+ }
+
+ @Ignore
+ @Test
+ public void performanceTest() throws Exception {
+ final int threadCount = 8;
+ final long runTimeMillis = 10 * 1000L;
+ ExecutorService threadPool = Executors.newFixedThreadPool(threadCount);
+
+ System.out.println("Running performance test for " + runTimeMillis + "ms...");
+ final AtomicLong rpcCount = new AtomicLong(0L);
+ final AtomicBoolean runFlag = new AtomicBoolean(true);
+ final CountDownLatch startLatch = new CountDownLatch(threadCount);
+ for (int ii = 0; ii < threadCount; ii++) {
+ threadPool.submit(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ startLatch.countDown();
+ startLatch.await(2, TimeUnit.SECONDS);
+ while (runFlag.get()) {
+ rpcCount.incrementAndGet();
+ Assert.assertEquals(new Utf8("Hello, World!"), simpleClient.hello("World!"));
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ });
+ }
+
+ startLatch.await(2, TimeUnit.SECONDS);
+ Thread.sleep(runTimeMillis);
+ runFlag.set(false);
+ threadPool.shutdown();
+ Assert.assertTrue("Timed out shutting down thread pool", threadPool.awaitTermination(2, TimeUnit.SECONDS));
+ System.out.println("Completed " + rpcCount.get() + " RPCs in " + runTimeMillis +
+ "ms => " + (((double)rpcCount.get() / (double)runTimeMillis) * 1000) + " RPCs/sec, " +
+ ((double)runTimeMillis / (double)rpcCount.get()) + " ms/RPC.");
+ }
+
+ /**
+ * Implementation of the Simple interface.
+ */
+ private static class SimpleImpl implements Simple {
+ private final AtomicBoolean ackFlag;
+
+ /**
+ * Creates a SimpleImpl.
+ * @param ackFlag the AtomicBoolean to toggle when ack() is called.
+ */
+ public SimpleImpl(final AtomicBoolean ackFlag) {
+ this.ackFlag = ackFlag;
+ }
+
+ @Override
+ public CharSequence hello(CharSequence greeting) throws AvroRemoteException {
+ return "Hello, " + greeting;
+ }
+
+ @Override
+ public TestRecord echo(TestRecord record) throws AvroRemoteException {
+ return record;
+ }
+
+ @Override
+ public int add(int arg1, int arg2) throws AvroRemoteException {
+ return arg1 + arg2;
+ }
+
+ @Override
+ public ByteBuffer echoBytes(ByteBuffer data) throws AvroRemoteException {
+ return data;
+ }
+
+ @Override
+ public Void error() throws AvroRemoteException, TestError {
+ TestError error = new TestError();
+ error.message = "Test Message";
+ throw error;
+ }
+
+ @Override
+ synchronized public void ack() {
+ ackFlag.set(!ackFlag.get());
+ ackLatch.get().countDown();
+ }
+ }
+}